CeleryBeatObject.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. # @Author : Rocky
  2. # @File : CeleryBeatObject.py
  3. # @Time : 2024/3/21 16:35
  4. import json
  5. import zoneinfo
  6. from django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSchedule, CrontabSchedule
  7. from Model.models import TimeZoneInfo
  8. from Service.CommonService import CommonService
  9. class CeleryBeatObj:
  10. # celery beat对象,封装定时任务函数
  11. # https://github.com/celery/django-celery-beat
  12. @staticmethod
  13. def creat_interval_task(every, period, name, task, args=None, kwargs=None):
  14. """
  15. 创建间隔任务
  16. @param every: 时间间隔
  17. @param period: 单位,seconds,minutes,hours,days(或IntervalSchedule.SECONDS...)
  18. @param name: 任务名称
  19. @param task: 任务函数
  20. @param args: 参数
  21. @param kwargs: 参数
  22. @return:
  23. """
  24. if args is None:
  25. args = []
  26. args = json.dumps(args)
  27. if kwargs is None:
  28. kwargs = {}
  29. kwargs = json.dumps(kwargs)
  30. schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period)
  31. PeriodicTask.objects.create(interval=schedule, name=name, task=task, args=args, kwargs=kwargs)
  32. @staticmethod
  33. def creat_clocked_task(timezone_offset, time_string, name, task, args=None, kwargs=None):
  34. """
  35. 创建定时任务
  36. @param timezone_offset: 时区偏移量
  37. @param time_string: 时间字符串
  38. @param name: 任务名称
  39. @param task: 任务函数
  40. @param args: 参数
  41. @param kwargs: 参数
  42. @return:
  43. """
  44. if args is None:
  45. args = []
  46. args = json.dumps(args)
  47. if kwargs is None:
  48. kwargs = {}
  49. kwargs = json.dumps(kwargs)
  50. time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
  51. clocked_time = CommonService.get_date_from_timestamp(time_stamp, timezone_offset)
  52. schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
  53. PeriodicTask.objects.create(clocked=schedule, one_off=True, name=name, task=task, args=args, kwargs=kwargs)
  54. @staticmethod
  55. def creat_crontab_task(
  56. timezone_offset, name, task,
  57. minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', args=None, kwargs=None):
  58. """
  59. 创建周期任务
  60. @param timezone_offset: 时区偏移量
  61. @param name: 任务名称
  62. @param task: 任务函数
  63. @param minute: 分
  64. @param hour: 时
  65. @param day_of_week: 周,1-7对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun
  66. @param day_of_month: 月
  67. @param month_of_year: 年
  68. @param args: 参数
  69. @param kwargs: 参数
  70. @return:
  71. """
  72. if args is None:
  73. args = []
  74. args = json.dumps(args)
  75. if kwargs is None:
  76. kwargs = {}
  77. kwargs = json.dumps(kwargs)
  78. time_zone_info_qs = TimeZoneInfo.objects.filter(tz=timezone_offset).values('zone_info')
  79. if time_zone_info_qs.exists():
  80. zone_info = time_zone_info_qs[0]['zone_info']
  81. timezone = zoneinfo.ZoneInfo(zone_info)
  82. schedule, _ = CrontabSchedule.objects.get_or_create(
  83. minute=minute,
  84. hour=hour,
  85. day_of_week=day_of_week,
  86. day_of_month=day_of_month,
  87. month_of_year=month_of_year,
  88. timezone=timezone)
  89. PeriodicTask.objects.create(crontab=schedule, name=name, task=task, args=args, kwargs=kwargs)
  90. @staticmethod
  91. def disable_task(name):
  92. """
  93. 暂停任务
  94. @param name: 任务名称
  95. @return:
  96. """
  97. periodic_task = PeriodicTask.objects.get(name=name)
  98. periodic_task.enabled = False
  99. periodic_task.save()
  100. @staticmethod
  101. def enable_task(name):
  102. """
  103. 恢复任务
  104. @param name: 任务名称
  105. @return:
  106. """
  107. periodic_task = PeriodicTask.objects.get(name=name)
  108. periodic_task.enabled = True
  109. periodic_task.save()
  110. @staticmethod
  111. def del_task(name):
  112. """
  113. 删除任务
  114. @param name: 任务名称
  115. @return:
  116. """
  117. PeriodicTask.objects.get(name=name).delete()