scheduler.management.commands.runscheduler 源代码

from apscheduler.schedulers.background import BackgroundScheduler
from django.conf import settings
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
import rpyc
from rpyc.utils.server import ThreadedServer

from record.log.utils import get_logger
from scheduler.config import scheduler_config as CONFIG
from utils.health_check import db_connection_healthy

TZ = settings.TIME_ZONE

logger = get_logger('apscheduler')


[文档] class SchedulerService(rpyc.Service): def __init__(self, scheduler: BackgroundScheduler): # It is OK to pass an instance of service to Threadserver now self.scheduler = scheduler
[文档] def exposed_wakeup(self): return self.scheduler.wakeup()
[文档] def exposed_health_check(self) -> bool: return db_connection_healthy() and self.scheduler.running
[文档] class Command(BaseCommand): help = "Runs apscheduler."
[文档] def handle(self, *args, **options): scheduler = BackgroundScheduler(timezone=TZ) scheduler.add_jobstore(DjangoJobStore(), "default") scheduler.start() protocol_config = { 'allow_all_attrs': True, 'logger': logger, } server = ThreadedServer(SchedulerService(scheduler), port=CONFIG.rpc_port, protocol_config=protocol_config) try: logger.info('Starting scheduler with executor') server.start() except (KeyboardInterrupt, SystemExit): pass finally: scheduler.shutdown()