ApschedulerObject.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import time
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. from django_apscheduler.jobstores import DjangoJobStore
  4. from Ansjer.config import LOGGER
  5. from django_apscheduler.models import DjangoJob
  6. import datetime
  7. import rpyc
  8. from apscheduler.executors.pool import ProcessPoolExecutor
  9. from apscheduler.executors.pool import ThreadPoolExecutor
  10. class ApschedulerObject:
  11. def __init__(self):
  12. self.scheduler = BackgroundScheduler(
  13. executors={
  14. 'default': ThreadPoolExecutor(1) # 最多3个进程同时运行
  15. })
  16. self.scheduler.add_jobstore(DjangoJobStore(), 'default')
  17. self.scheduler.start()
  18. # self.conn = rpyc.connect('localhost', 12345)
  19. @staticmethod
  20. def auto_hello(): # 任务
  21. now_time = time.time()
  22. print('hello world:[{}]'.format(now_time))
  23. def create_cron_job(self, func, task_id, day_of_week, hour, minute, args): # 周期任务
  24. job = self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,
  25. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args,
  26. misfire_grace_time=300)
  27. # job = self.conn.root.add_job(func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,
  28. # id=task_id, args=args, max_instances=1, coalesce=False, misfire_grace_time=300)
  29. print(job)
  30. def create_interval_job(self, func, task_id, minutes, start_time, end_time, args): # 间隔任务
  31. self.scheduler.add_job(func=func, trigger='interval', minutes=minutes,
  32. start_date=datetime.datetime.fromtimestamp(start_time),
  33. end_date=datetime.datetime.fromtimestamp(end_time),
  34. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  35. def create_date_job(self, func, task_id, time_stamp, args):
  36. """
  37. 创建时间点任务
  38. @param func:
  39. @param task_id:
  40. @param time_stamp:
  41. @param args:
  42. @return:
  43. """
  44. self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
  45. replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
  46. @staticmethod
  47. def del_job(task_id): # 删除任务
  48. DjangoJob.objects.filter(id=task_id).delete()