scheduler.periodic 源代码

from typing import Callable
from dataclasses import dataclass


__all__ = ['periodical']


@dataclass
class PeriodicalJob:
    """
    Wrap a function as a periodical job.
    Notice that it is not a callable.
    """
    function: Callable[..., None]
    job_id: str
    trigger: str
    tg_args: dict[str, int]


_periodical_jobs: list[PeriodicalJob] = []


[文档] def periodical(trigger: str, job_id: str = '', **trigger_args): """Wrap a function into a periodical job. If `job_id` is not provided, use function name. :param trigger: 'cron' or 'interval' :type trigger: str """ def wrapper(fn: Callable[..., None]): _job = PeriodicalJob(fn, job_id or fn.__name__, trigger, trigger_args) _periodical_jobs.append(_job) return fn return wrapper