extern.multithread 源代码

from typing import Callable, ParamSpec, Any
from datetime import datetime, timedelta

from extern.config import wechat_config as CONFIG
from scheduler.adder import ScheduleAdder


__all__ = [
    'scheduler_enabled',
    'get_caller',
]


[文档] def scheduler_enabled(multithread: bool = True) -> bool: '''判断定时任务是否可用''' return multithread and CONFIG.multithread
P = ParamSpec('P')
[文档] def get_caller(func: Callable[P, None], *, multithread: bool = True, run_time: datetime | timedelta | None = None, job_id: str | None = None, replace: bool = True) -> Callable[P, None]: '''获取函数的调用者''' if not scheduler_enabled(multithread): return func adder = ScheduleAdder(func, run_time=run_time, id=job_id, replace=replace) # 不应使用返回值,但要确保调用参数正确 adder: Callable[P, Any] return adder # type: ignore