scheduler.management.commands.runscheduler 源代码
from apscheduler.schedulers.background import BackgroundScheduler
from django.conf import settings
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
import rpyc
from rpyc.utils.server import ThreadedServer
from record.log.utils import get_logger
from scheduler.config import scheduler_config as CONFIG
from utils.health_check import db_connection_healthy
TZ = settings.TIME_ZONE
logger = get_logger('apscheduler')
[文档]
class SchedulerService(rpyc.Service):
def __init__(self, scheduler: BackgroundScheduler):
# It is OK to pass an instance of service to Threadserver now
self.scheduler = scheduler
[文档]
def exposed_wakeup(self):
return self.scheduler.wakeup()
[文档]
def exposed_health_check(self) -> bool:
return db_connection_healthy() and self.scheduler.running
[文档]
class Command(BaseCommand):
help = "Runs apscheduler."
[文档]
def handle(self, *args, **options):
scheduler = BackgroundScheduler(timezone=TZ)
scheduler.add_jobstore(DjangoJobStore(), "default")
scheduler.start()
protocol_config = {
'allow_all_attrs': True,
'logger': logger,
}
server = ThreadedServer(SchedulerService(scheduler),
port=CONFIG.rpc_port,
protocol_config=protocol_config)
try:
logger.info('Starting scheduler with executor')
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()