ApschedulerObject.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import time
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. from django_apscheduler.jobstores import DjangoJobStore
  4. from django_apscheduler.models import DjangoJob
  5. import datetime
  6. import pytz
  7. class ApschedulerObject:
  8. def __init__(self, timezone_offset=0.00):
  9. # 计算时区偏移量(以分钟为单位)
  10. timezone_offset_minutes = int(timezone_offset * 60)
  11. timezone = pytz.FixedOffset(timezone_offset_minutes)
  12. self.scheduler = BackgroundScheduler(timezone=timezone)
  13. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  14. self.scheduler.start()
  15. @staticmethod
  16. def auto_hello(x): # 任务
  17. now_time = time.time()
  18. print('hello world: {} {}'.format(x, now_time))
  19. def create_cron_job(self, func, task_id, day_of_week, hour, minute, args, second=0): # 周期任务
  20. job = self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,
  21. second=second,
  22. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args,
  23. misfire_grace_time=300)
  24. print(job)
  25. def create_interval_job(self, func, task_id, minutes, start_time, end_time, args): # 间隔任务
  26. self.scheduler.add_job(func=func, trigger='interval', minutes=minutes,
  27. start_date=datetime.datetime.fromtimestamp(start_time),
  28. end_date=datetime.datetime.fromtimestamp(end_time),
  29. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  30. def create_date_job(self, func, task_id, time_stamp, args):
  31. """
  32. 创建时间点任务
  33. @param func:
  34. @param task_id:
  35. @param time_stamp:
  36. @param args:
  37. @return:
  38. """
  39. self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  40. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  41. @staticmethod
  42. def del_job(task_id): # 删除任务
  43. DjangoJob.objects.filter(id=task_id).delete()
  44. def pause_job(self, task_id): # 暂停任务
  45. self.scheduler.pause_job(task_id)
  46. def resume_job(self, task_id): # 恢复任务
  47. self.scheduler.resume_job(task_id)