ApschedulerObject.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. import socket
  2. import time
  3. from apscheduler.schedulers.background import BackgroundScheduler
  4. from django_apscheduler.jobstores import DjangoJobStore
  5. from django_apscheduler.models import DjangoJob
  6. import datetime
  7. import pytz
  8. class ApschedulerObject:
  9. def __init__(self, timezone_offset=0.00):
  10. try:
  11. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  12. sock.bind(('127.0.0.1', 12345))
  13. except socket.error:
  14. pass
  15. else:
  16. # 计算时区偏移量(以分钟为单位)
  17. timezone_offset_minutes = int(timezone_offset * 60)
  18. timezone = pytz.FixedOffset(timezone_offset_minutes)
  19. self.scheduler = BackgroundScheduler(timezone=timezone)
  20. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  21. self.scheduler.start()
  22. @staticmethod
  23. def auto_hello(x): # 任务
  24. now_time = time.time()
  25. print('hello world: {} {}'.format(x, now_time))
  26. def create_cron_job(self, func, task_id, day_of_week, hour, minute, args, second=0): # 周期任务
  27. # day_of_week: 0,1,2,3,4,5,6对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun
  28. self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,
  29. second=second, replace_existing=True, id=task_id, max_instances=1, coalesce=False,
  30. args=args, misfire_grace_time=300)
  31. def create_interval_job(self, func, task_id, minutes, start_time, end_time, args): # 间隔任务
  32. self.scheduler.add_job(func=func, trigger='interval', minutes=minutes,
  33. start_date=datetime.datetime.fromtimestamp(start_time),
  34. end_date=datetime.datetime.fromtimestamp(end_time),
  35. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  36. def create_date_job(self, func, task_id, time_stamp, args):
  37. """
  38. 创建时间点任务
  39. @param func:
  40. @param task_id:
  41. @param time_stamp:
  42. @param args:
  43. @return:
  44. """
  45. self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  46. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  47. @staticmethod
  48. def del_job(task_id): # 删除任务
  49. DjangoJob.objects.filter(id=task_id).delete()
  50. def pause_job(self, task_id): # 暂停任务
  51. self.scheduler.pause_job(task_id)
  52. def resume_job(self, task_id): # 恢复任务
  53. self.scheduler.resume_job(task_id)