extern.wechat 源代码

'''
wechat.py

集合发送到微信的函数

- 可导出函数可以假设是异步IO,参数符合条件时不抛出异常,具体情况见配置文件
- 异步时,函数只返回尝试状态,即是否设置了定时任务,不保证成功发送
- _开头的函数是私有函数,子模块不应调用
'''
import requests
import json
from typing import Iterable, Callable, Any
from datetime import datetime, timedelta

from extern.config import wechat_config as CONFIG
from extern.multithread import get_caller, scheduler_enabled
from extern.log import ExternLogger
from utils.http.utils import build_full_url


__all__ = [
    'send_wechat',
    'send_verify_code',
    'invite_to_wechat',
]


# 全局变量 用来发送和确认默认的导航网址
DEFAULT_URL = build_full_url('/')
logger = ExternLogger.getLogger('wechat')


def _get_available_users(users: Iterable[str | int]) -> list[str]:
    _users = map(str, users)
    _users = set(_users)
    if CONFIG.receivers is not None:
        _users &= CONFIG.receivers
    if CONFIG.blacklist:
        _users -= CONFIG.blacklist
    _users = sorted(_users)
    return _users


ParseResult = tuple[str | None, list[str] | None]
def _post_and_parse(
    post_url: str,
    post_data: dict[str, Any],
    timeout: int | float,
    detail_parser: Callable[[Any], ParseResult] | None = None,
) -> ParseResult:
    '''
    发送post请求并解析回应,返回解析结果,解析结果

    Args:
        post_url(str): 请求的url
        post_data(dict): 请求的数据
        timeout(int | float): 超时时间
        detail_parser(Callable[[Any], ParseResult], optional): 解析回应中细节部分的函数
    
    Returns:
        parseResult: 解析结果,包含错误信息(成功时为None)和建议重发的失败用户列表
    '''
    try: _post_data = json.dumps(post_data)
    except: return "JSON编码失败", None
    try: raw_response = requests.post(post_url, _post_data, timeout=timeout)
    except: return "连接API失败", None
    try: response: dict[str, Any] = raw_response.json()
    except: return "JSON解析失败", None
    try:
        if response["status"] == 200:           # 全部发送成功
            return None, []
        datas: dict = response["data"]
        if datas.get("detail"):                 # 部分发送失败
            assert detail_parser is not None
            return detail_parser(datas["detail"])
        errmsg: str = datas["errMsg"]           # 参数等其他传入格式问题
        return errmsg, None
    except:
        return "回应解析失败", None


def _log_users(users: list[str]) -> str:
    if len(users) <= 10:
        return ', '.join(users)
    user_display = users[:3] + ['...'] + users[-3:]
    return ', '.join(user_display) + f'等{len(users)}用户'


def _send_wechat(
    users: list[str],
    content: str,
    api_url: str,
    card: bool = True,
    url: str | None = None,
    btntxt: str | None = None,
    *,
    retry_times: int = 1,
):
    """底层实现发送到微信,是为了方便设置定时任务"""
    post_data = {
        "touser": users,
        "content": content,
        "toall": True,
        "secret": CONFIG.hasher.encode(content),
    }
    if card:
        post_data["card"] = True
        if url is not None:
            post_data["url"] = url
        if btntxt is not None:
            post_data["btntxt"] = btntxt

    def _parser(detail: list[tuple[str, str]]) -> ParseResult:
        retrys = [x[0] for x in detail]
        errmsg = detail[0][1]             # 失败原因基本相同,取一个即可
        return errmsg, retrys

    for i in range(retry_times):
        errmsg, retrys = _post_and_parse(api_url, post_data, CONFIG.timeout, _parser)
        if errmsg is None:
            logger.info(f"成功向{_log_users(users)}发送消息")
            break
        if retrys is None:
            logger.warning(f"向{_log_users(users)}发送消息失败:{errmsg}")
            break
        post_data["touser"] = retrys
        logger.warning(f"向{_log_users(users)}发送时,{_log_users(retrys)}失败:{errmsg}")


[文档] def send_wechat( users: Iterable[str | int], title: str, message: str, card: bool = True, url: str | None = None, btntxt: str | None = None, *, api_path: str = '', default: bool = True, retry_times: int = 1, multithread: bool = True, run_time: datetime | timedelta | None = None, task_id: str | None = None, ): """ 附带了去重、多线程和batch的发送;不应被服务直接调用 参数 -------- - users(Iterable[str | int]): 用户列表 - title(str): 标题 - message(str): 内容,可为空字符串 - card(bool): 发送文本卡片,建议message长度小于120时开启 - url(str, optional): 文本卡片的链接,相对路径会被转换为绝对路径 - btntxt(str, optional): 文本卡片的提示短语,不超过4个字 - 仅关键字参数 - api_path(str, optional): API路径,相对路径会被转换为绝对路径 - default(bool, optional): 填充默认值 - retry_times(int, optional): 重试次数 - multithread(bool, optional): 使用多线程(需要启用多线程),不堵塞当前线程 - run_time(datetime | timedelta, optional): 执行时间,时间或延迟 - task_id(str, optional): 任务标识符,与定时任务标识符一致 异常 -------- - RuntimeError: 定时任务未启用时,设置定时发送时间 """ users = _get_available_users(users) if not users: return logger.warning('没有可用用户') content = f'{title}<title>{message}' if card: if url is not None: url = build_full_url(url) elif default: url = DEFAULT_URL if btntxt is None and default: btntxt = "详情" if not CONFIG.retry: retry_times = 1 if run_time is not None and not scheduler_enabled(multithread): if isinstance(run_time, datetime): _schedule_time = run_time.strftime('%Y-%m-%d %H:%M:%S') _schedule_time = f'计划于{_schedule_time}执行' else: _schedule_time = f'延迟{run_time}执行' logger.error(f'无法设置{_schedule_time}的任务{task_id}:定时任务未启用') raise RuntimeError('定时任务未启用') total_ct = len(users) caller = get_caller(_send_wechat, multithread=multithread, job_id=task_id, run_time=run_time) for i in range(0, total_ct, CONFIG.send_batch): userids = users[i : i + CONFIG.send_batch] caller( userids, content, build_full_url(api_path, CONFIG.api_url), card=card, url=url, btntxt=btntxt, retry_times=retry_times, )
[文档] def send_verify_code(stu_id: str | int, captcha: str, url: str | None = '/forgetpw/'): time = datetime.now().strftime('%m月%d日 %H:%M:%S') message = ( "您的账号正在进行企业微信验证\n本次请求的验证码为:" f"<div class=\"highlight\">{captcha}</div>" f"发送时间:{time}" ) url = build_full_url(url) if url is not None else None btntxt = "登录" if url is not None else None send_wechat([stu_id], 'YPPF登录验证', message, card=True, url=url, btntxt=btntxt, task_id=f'wechat_verify: {stu_id}')
def _invite_to_wechat(stu_id: str, retry_times: int = 1): INVITE_URL = build_full_url('/invite_user', CONFIG.api_url) post_data = { "user": stu_id, "secret": CONFIG.hasher.encode(stu_id), } def _parser(detail: str) -> ParseResult: return detail, [stu_id] for i in range(retry_times): errmsg, retrys = _post_and_parse(INVITE_URL, post_data, CONFIG.timeout, _parser) if errmsg is None: logger.info(f"成功向{stu_id}发送邀请") break logger.warning(f"向{stu_id}发送邀请失败:{errmsg}") if retrys is None: break
[文档] def invite_to_wechat(stu_id: str | int, retry_times: int = 3, *, multithread: bool = True): caller = get_caller(_invite_to_wechat, multithread=multithread) caller(str(stu_id), retry_times=retry_times)