ApschedulerObject.py 1.2 KB

123456789101112131415161718192021222324252627282930
  1. import time
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. from django_apscheduler.jobstores import DjangoJobStore
  4. from Ansjer.config import LOGGER
  5. from django_apscheduler.models import DjangoJob
  6. import datetime
  7. class ApschedulerObject:
  8. def __init__(self):
  9. self.scheduler = BackgroundScheduler()
  10. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  11. self.scheduler.start()
  12. @staticmethod
  13. def auto_hello(): # 任务
  14. now_time = time.time()
  15. print('hello world:[{}]'.format(now_time))
  16. def cron_job(self, task_id, day_of_week, hour, minute): # 周期任务
  17. self.scheduler.add_job(self.auto_hello, 'cron', day_of_week=day_of_week, hour=hour, minute=minute,
  18. replace_existing=True, id=task_id, max_instances=1, coalesce=True)
  19. def date_job(self, task_id, time_stamp): # 时间点任务
  20. self.scheduler.add_job(self.auto_hello, 'date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  21. replace_existing=True, id=task_id, max_instances=1, coalesce=True)
  22. @staticmethod
  23. def del_job(task_id): # 删除任务
  24. DjangoJob.objects.filter(id=task_id).delete()