ApschedulerObject.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import time
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. from django_apscheduler.jobstores import DjangoJobStore
  4. from django_apscheduler.models import DjangoJob
  5. import datetime
  6. import pytz
  7. class ApschedulerObject:
  8. def __init__(self, timezone_offset=0.00):
  9. # 计算时区偏移量(以分钟为单位)
  10. timezone_offset_minutes = int(timezone_offset * 60)
  11. timezone = pytz.FixedOffset(timezone_offset_minutes)
  12. self.scheduler = BackgroundScheduler(timezone=timezone)
  13. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  14. self.scheduler.start()
  15. @staticmethod
  16. def auto_hello(x): # 任务
  17. now_time = time.time()
  18. print('hello world: {} {}'.format(x, now_time))
  19. def create_cron_job(self, func, task_id, day_of_week, hour, minute, args, second=0): # 周期任务
  20. # day_of_week: 0,1,2,3,4,5,6对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun
  21. self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,
  22. second=second, replace_existing=True, id=task_id, max_instances=1, coalesce=False,
  23. args=args, misfire_grace_time=300)
  24. def create_interval_job(self, func, task_id, minutes, start_time, end_time, args): # 间隔任务
  25. self.scheduler.add_job(func=func, trigger='interval', minutes=minutes,
  26. start_date=datetime.datetime.fromtimestamp(start_time),
  27. end_date=datetime.datetime.fromtimestamp(end_time),
  28. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  29. def create_date_job(self, func, task_id, time_stamp, args):
  30. """
  31. 创建时间点任务
  32. @param func:
  33. @param task_id:
  34. @param time_stamp:
  35. @param args:
  36. @return:
  37. """
  38. self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  39. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  40. @staticmethod
  41. def del_job(task_id): # 删除任务
  42. DjangoJob.objects.filter(id=task_id).delete()
  43. def pause_job(self, task_id): # 暂停任务
  44. self.scheduler.pause_job(task_id)
  45. def resume_job(self, task_id): # 恢复任务
  46. self.scheduler.resume_job(task_id)