ApschedulerObject.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import time
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. from django_apscheduler.jobstores import DjangoJobStore
  4. from Ansjer.config import LOGGER
  5. from django_apscheduler.models import DjangoJob
  6. import datetime
  7. from apscheduler.triggers.cron import CronTrigger
  8. class ApschedulerObject:
  9. def __init__(self):
  10. self.scheduler = BackgroundScheduler()
  11. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  12. self.scheduler.start()
  13. @staticmethod
  14. def auto_hello(): # 任务
  15. now_time = time.time()
  16. print('hello world:[{}]'.format(now_time))
  17. def create_cron_job(self, func, task_id, day_of_week, hour, minute, args): # 周期任务
  18. self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,
  19. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  20. def create_interval_job(self, func, task_id, minutes, start_time, end_time, args): # 间隔任务
  21. self.scheduler.add_job(func=func, trigger='interval', minutes=minutes,
  22. start_date=datetime.datetime.fromtimestamp(start_time),
  23. end_date=datetime.datetime.fromtimestamp(end_time),
  24. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  25. def create_date_job(self, func, task_id, time_stamp, args):
  26. """
  27. 创建时间点任务
  28. @param func:
  29. @param task_id:
  30. @param time_stamp:
  31. @param args:
  32. @return:
  33. """
  34. self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  35. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  36. @staticmethod
  37. def del_job(task_id): # 删除任务
  38. DjangoJob.objects.filter(id=task_id).delete()