ApschedulerObject.py 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import socket
  2. import time
  3. from apscheduler.schedulers.background import BackgroundScheduler
  4. from django_apscheduler.jobstores import DjangoJobStore
  5. from django_apscheduler.models import DjangoJob
  6. import datetime
  7. import pytz
  8. class ApschedulerObject:
  9. def __init__(self, timezone_offset=0.00):
  10. # 计算时区偏移量(以分钟为单位)
  11. timezone_offset_minutes = int(timezone_offset * 60)
  12. timezone = pytz.FixedOffset(timezone_offset_minutes)
  13. self.scheduler = BackgroundScheduler(timezone=timezone)
  14. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  15. self.scheduler.start()
  16. @staticmethod
  17. def auto_hello(x): # 任务
  18. now_time = time.time()
  19. print('hello world: {} {}'.format(x, now_time))
  20. def create_cron_job(self, func, task_id, day_of_week, hour, minute, args, second=0): # 周期任务
  21. # day_of_week: 0,1,2,3,4,5,6对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun
  22. self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,
  23. second=second, replace_existing=True, id=task_id, max_instances=1, coalesce=False,
  24. args=args, misfire_grace_time=300)
  25. def create_interval_job(self, func, task_id, minutes, start_time, end_time, args): # 间隔任务
  26. self.scheduler.add_job(func=func, trigger='interval', minutes=minutes,
  27. start_date=datetime.datetime.fromtimestamp(start_time),
  28. end_date=datetime.datetime.fromtimestamp(end_time),
  29. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  30. def create_date_job(self, func, task_id, time_stamp, args):
  31. """
  32. 创建时间点任务
  33. @param func:
  34. @param task_id:
  35. @param time_stamp:
  36. @param args:
  37. @return:
  38. """
  39. self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  40. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  41. @staticmethod
  42. def del_job(task_id): # 删除任务
  43. DjangoJob.objects.filter(id=task_id).delete()
  44. def pause_job(self, task_id): # 暂停任务
  45. self.scheduler.pause_job(task_id)
  46. def resume_job(self, task_id): # 恢复任务
  47. self.scheduler.resume_job(task_id)
  48. class ApschedulerObjectTest:
  49. def __init__(self, func, task_id, time_stamp, args, timezone_offset=0.00):
  50. try:
  51. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  52. sock.bind(('127.0.0.1', 12345))
  53. except Exception:
  54. pass
  55. else:
  56. # 计算时区偏移量(以分钟为单位)
  57. timezone_offset_minutes = int(timezone_offset * 60)
  58. timezone = pytz.FixedOffset(timezone_offset_minutes)
  59. self.scheduler = BackgroundScheduler(timezone=timezone)
  60. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  61. self.create_date_job(func, task_id, time_stamp, args)
  62. self.scheduler.start()
  63. def create_date_job(self, func, task_id, time_stamp, args):
  64. """
  65. 创建时间点任务
  66. @param func:
  67. @param task_id:
  68. @param time_stamp:
  69. @param args:
  70. @return:
  71. """
  72. self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  73. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)