import time from apscheduler.schedulers.background import BackgroundScheduler from django_apscheduler.jobstores import DjangoJobStore from Ansjer.config import LOGGER from django_apscheduler.models import DjangoJob import datetime class ApschedulerObject: def __init__(self): self.scheduler = BackgroundScheduler() self.scheduler.add_jobstore(DjangoJobStore(), 'default') self.scheduler.start() @staticmethod def auto_hello(): # 任务 now_time = time.time() print('hello world:[{}]'.format(now_time)) def cron_job(self, task_id, day_of_week, hour, minute): # 周期任务 self.scheduler.add_job(self.auto_hello, 'cron', day_of_week=day_of_week, hour=hour, minute=minute, replace_existing=True, id=task_id, max_instances=1, coalesce=True) def date_job(self, task_id, time_stamp): # 时间点任务 self.scheduler.add_job(self.auto_hello, 'date', run_date=datetime.datetime.fromtimestamp(time_stamp), replace_existing=True, id=task_id, max_instances=1, coalesce=True) @staticmethod def del_job(task_id): # 删除任务 DjangoJob.objects.filter(id=task_id).delete()