scheduler.scheduler 源代码
"""Scheduler for executing jobs in background
The scheduler is unique and should not export to other modules.
This module is no longer a exposed interface, use other modules instead.
Attributes:
scheduler (BackgroundScheduler): The scheduler instance,
treat it as a BackgroundScheduler
logger (Logger): Logger for scheduler
See Also:
- :module:`scheduler.adder`
- :module:`scheduler.periodic`
- :module:`scheduler.cancel`
"""
import six
from threading import Event
from functools import update_wrapper
import rpyc
from django.conf import settings
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
from record.log.utils import get_logger
from scheduler.config import scheduler_config as CONFIG
# Custom handler
logger = get_logger('apscheduler')
[文档]
class Scheduler:
"""
A wrapper around `BackgroundScheduler`
It won't execute the job.
When adding the job to database, also try to wakeup the executor
"""
def __init__(self, scheduler: BackgroundScheduler, retry_times: int = 3):
self.wrapped_scheduler = scheduler
self.remote_scheduler: BackgroundScheduler | None = None
self.retry_times = retry_times
def __getattr__(self, name: str):
target_method = getattr(self.wrapped_scheduler, name)
def wrapper(*args, **kwargs):
val = target_method(*args, **kwargs)
self.wakeup_executor()
return val
update_wrapper(wrapper, target_method)
return wrapper
[文档]
def wakeup_executor(self):
for _ in range(self.retry_times):
if self._try_wakeup():
break
self.connect_remote()
if self.remote_scheduler is not None or self.connect_remote():
self._try_wakeup()
def _try_wakeup(self) -> bool:
try:
self.remote_scheduler.wakeup()
return True
except:
return False
[文档]
def connect_remote(self) -> bool:
try:
conn: rpyc.Connection = rpyc.connect(
"localhost", CONFIG.rpc_port,
config={"allow_all_attrs": True})
self.remote_scheduler = conn.root
return True
except Exception as e:
self.log_network_err(e)
return False
[文档]
def log_network_err(self, exc):
logger.exception(f'Remotely wakeup executor failed: {exc}')
[文档]
def start_scheduler() -> BackgroundScheduler:
"""Return a background scheduler that can add job to database,
but not actually run the job.
"""
scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), "default")
scheduler._event = Event() # type: ignore
with scheduler._jobstores_lock: # type: ignore
for alias, store in six.iteritems(scheduler._jobstores): # type: ignore
store.start(scheduler, alias)
scheduler.state = 1 # STATE_RUNNING
return scheduler
if CONFIG.use_scheduler:
scheduler: BackgroundScheduler = Scheduler(start_scheduler()) # type: ignore
else:
# Not start, no real_add_job
scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), "default")