ApschedulerObject.py 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import socket
  2. import time
  3. from apscheduler.schedulers.background import BackgroundScheduler
  4. from django_apscheduler.jobstores import DjangoJobStore
  5. from django_apscheduler.models import DjangoJob
  6. import datetime
  7. import pytz
  8. class ApschedulerObject:
  9. def __init__(self, timezone_offset=0.00):
  10. try:
  11. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  12. sock.bind(('127.0.0.1', 12345))
  13. except Exception:
  14. pass
  15. else:
  16. # 计算时区偏移量(以分钟为单位)
  17. timezone_offset_minutes = int(timezone_offset * 60)
  18. timezone = pytz.FixedOffset(timezone_offset_minutes)
  19. self.scheduler = BackgroundScheduler(timezone=timezone)
  20. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  21. self.scheduler.start()
  22. @staticmethod
  23. def auto_hello(x): # 任务
  24. now_time = time.time()
  25. print('hello world: {} {}'.format(x, now_time))
  26. def create_cron_job(self, func, task_id, day_of_week, hour, minute, args, second=0): # 周期任务
  27. # day_of_week: 0,1,2,3,4,5,6对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun
  28. self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,
  29. second=second, replace_existing=True, id=task_id, max_instances=1, coalesce=False,
  30. args=args, misfire_grace_time=300)
  31. def create_interval_job(self, func, task_id, minutes, start_time, end_time, args): # 间隔任务
  32. self.scheduler.add_job(func=func, trigger='interval', minutes=minutes,
  33. start_date=datetime.datetime.fromtimestamp(start_time),
  34. end_date=datetime.datetime.fromtimestamp(end_time),
  35. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  36. def create_date_job(self, func, task_id, time_stamp, args):
  37. """
  38. 创建时间点任务
  39. @param func:
  40. @param task_id:
  41. @param time_stamp:
  42. @param args:
  43. @return:
  44. """
  45. self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  46. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  47. @staticmethod
  48. def del_job(task_id): # 删除任务
  49. DjangoJob.objects.filter(id=task_id).delete()
  50. def pause_job(self, task_id): # 暂停任务
  51. self.scheduler.pause_job(task_id)
  52. def resume_job(self, task_id): # 恢复任务
  53. self.scheduler.resume_job(task_id)
  54. class ApschedulerObjectTest:
  55. def __init__(self, func, task_id, time_stamp, args, timezone_offset=0.00):
  56. try:
  57. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  58. sock.bind(('127.0.0.1', 12345))
  59. except Exception:
  60. pass
  61. else:
  62. # 计算时区偏移量(以分钟为单位)
  63. timezone_offset_minutes = int(timezone_offset * 60)
  64. timezone = pytz.FixedOffset(timezone_offset_minutes)
  65. self.scheduler = BackgroundScheduler(timezone=timezone)
  66. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  67. self.create_date_job(func, task_id, time_stamp, args)
  68. self.scheduler.start()
  69. def create_date_job(self, func, task_id, time_stamp, args):
  70. """
  71. 创建时间点任务
  72. @param func:
  73. @param task_id:
  74. @param time_stamp:
  75. @param args:
  76. @return:
  77. """
  78. self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  79. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)