scheduler.management.commands.collect_jobs 源代码

from importlib import import_module

from django.apps import apps
from django.core.management.base import BaseCommand

from scheduler.scheduler import scheduler
from scheduler.periodic import _periodical_jobs


[文档] class Command(BaseCommand): help = "Register periodical jobs."
[文档] def handle(self, *args, **options): for app in apps.get_app_configs(): job_module_path = app.name + '.jobs' try: jobs = import_module(job_module_path) except: continue print('Looking for periodical jobs in', job_module_path) for pjob in _periodical_jobs: job_id, fn, trigger = pjob.job_id, pjob.function, pjob.trigger print(f'\t{job_id} (fn: {fn.__name__}, trigger: {trigger})') scheduler.add_job(fn, trigger, id=job_id, replace_existing=True, **pjob.tg_args)