CeleryBeatObject.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. # @Author : Rocky
  2. # @File : CeleryBeatObject.py
  3. # @Time : 2024/3/21 16:35
  4. import json
  5. import zoneinfo
  6. from django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSchedule, CrontabSchedule
  7. from Model.models import TimeZoneInfo
  8. from Service.CommonService import CommonService
  9. from django.core.exceptions import ObjectDoesNotExist
  10. class CeleryBeatObj:
  11. # celery beat对象,封装定时任务函数
  12. # https://github.com/celery/django-celery-beat
  13. @staticmethod
  14. def creat_interval_task(every, period, name, task, args=None, kwargs=None):
  15. """
  16. 创建间隔任务
  17. @param every: 时间间隔
  18. @param period: 单位,seconds,minutes,hours,days(或IntervalSchedule.SECONDS...)
  19. @param name: 任务名称
  20. @param task: 任务函数
  21. @param args: 参数
  22. @param kwargs: 参数
  23. @return:
  24. """
  25. if args is None:
  26. args = []
  27. args = json.dumps(args)
  28. if kwargs is None:
  29. kwargs = {}
  30. kwargs = json.dumps(kwargs)
  31. schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period)
  32. PeriodicTask.objects.create(interval=schedule, name=name, task=task, args=args, kwargs=kwargs)
  33. @staticmethod
  34. def creat_clocked_task(name, task, time_stamp=0, timezone_offset=0, time_string='', args=None, kwargs=None):
  35. """
  36. 创建定时任务
  37. @param name: 任务名称
  38. @param task: 任务函数
  39. @param time_stamp: 时间戳
  40. @param timezone_offset: 时区偏移量
  41. @param time_string: 时间字符串
  42. @param args: 参数
  43. @param kwargs: 参数
  44. @return:
  45. """
  46. if args is None:
  47. args = []
  48. args = json.dumps(args)
  49. if kwargs is None:
  50. kwargs = {}
  51. kwargs = json.dumps(kwargs)
  52. # 不传时间戳时,将时间字符串转为时间戳
  53. if time_stamp == 0:
  54. time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
  55. # 时间戳转为东八区的时间字符串
  56. clocked_time = CommonService.get_date_from_timestamp(time_stamp, 8)
  57. periodic_task = PeriodicTask.objects.filter(name=name)
  58. if periodic_task.exists():
  59. periodic_task_qs = periodic_task[0]
  60. clocked_id = periodic_task_qs.clocked_id
  61. ClockedSchedule.objects.filter(id=clocked_id).update(clocked_time=clocked_time)
  62. periodic_task_qs.task = task
  63. periodic_task_qs.args = args
  64. periodic_task_qs.kwargs = kwargs
  65. periodic_task_qs.save()
  66. else:
  67. schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
  68. PeriodicTask.objects.create(clocked=schedule, one_off=True, name=name, task=task, args=args, kwargs=kwargs)
  69. @staticmethod
  70. def creat_crontab_task(
  71. timezone_offset, name, task,
  72. minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', args=None, kwargs=None):
  73. """
  74. 创建周期任务
  75. @param timezone_offset: 时区偏移量
  76. @param name: 任务名称
  77. @param task: 任务函数
  78. @param minute: 分
  79. @param hour: 时
  80. @param day_of_week: 周,1-7对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun
  81. @param day_of_month: 月
  82. @param month_of_year: 年
  83. @param args: 参数
  84. @param kwargs: 参数
  85. @return:
  86. """
  87. if args is None:
  88. args = []
  89. args = json.dumps(args)
  90. if kwargs is None:
  91. kwargs = {}
  92. kwargs = json.dumps(kwargs)
  93. time_zone_info_qs = TimeZoneInfo.objects.filter(tz=timezone_offset).values('zone_info')
  94. if time_zone_info_qs.exists():
  95. zone_info = time_zone_info_qs[0]['zone_info']
  96. timezone = zoneinfo.ZoneInfo(zone_info)
  97. schedule, _ = CrontabSchedule.objects.get_or_create(
  98. minute=minute,
  99. hour=hour,
  100. day_of_week=day_of_week,
  101. day_of_month=day_of_month,
  102. month_of_year=month_of_year,
  103. timezone=timezone)
  104. PeriodicTask.objects.create(crontab=schedule, name=name, task=task, args=args, kwargs=kwargs)
  105. @staticmethod
  106. def disable_task(name):
  107. """
  108. 暂停任务
  109. @param name: 任务名称
  110. @return:
  111. """
  112. try:
  113. periodic_task = PeriodicTask.objects.get(name=name)
  114. periodic_task.enabled = False
  115. periodic_task.save()
  116. except ObjectDoesNotExist:
  117. pass
  118. @staticmethod
  119. def enable_task(name):
  120. """
  121. 恢复任务
  122. @param name: 任务名称
  123. @return:
  124. """
  125. try:
  126. periodic_task = PeriodicTask.objects.get(name=name)
  127. periodic_task.enabled = True
  128. periodic_task.save()
  129. except ObjectDoesNotExist:
  130. pass
  131. @staticmethod
  132. def del_task(name):
  133. """
  134. 删除任务
  135. @param name: 任务名称
  136. @return:
  137. """
  138. IntervalSchedule.objects.filter(periodictask__name=name).delete()
  139. CrontabSchedule.objects.filter(periodictask__name=name).delete()
  140. ClockedSchedule.objects.filter(periodictask__name=name).delete()
  141. PeriodicTask.objects.filter(name=name).delete()
  142. @staticmethod
  143. def update_task(name, **kwargs):
  144. """
  145. 更新任务
  146. @param name: 任务名称
  147. @param kwargs: 需要更新的属性及对应的值(可选)
  148. - interval: (every, period) 或 IntervalSchedule 对象,用于更新间隔任务
  149. - clocked_time: str 或 datetime,用于更新定时任务的时间戳
  150. - crontab: (minute, hour, day_of_week, day_of_month, month_of_year, timezone) 或 CrontabSchedule 对象,用于更新周期任务
  151. - task: 新的任务函数名
  152. - args: 新的参数列表
  153. - kwargs: 新的关键字参数字典
  154. - enabled: 是否启用任务(True 或 False)
  155. @return:
  156. None
  157. """
  158. periodic_task = PeriodicTask.objects.get(name=name)
  159. # 更新通用属性
  160. if 'task' in kwargs:
  161. periodic_task.task = kwargs['task']
  162. if 'args' in kwargs:
  163. periodic_task.args = json.dumps(kwargs['args'])
  164. if 'kwargs' in kwargs:
  165. periodic_task.kwargs = json.dumps(kwargs['kwargs'])
  166. if 'enabled' in kwargs:
  167. periodic_task.enabled = kwargs['enabled']
  168. # 更新不同类型任务的特定属性
  169. if 'interval' in kwargs:
  170. if isinstance(kwargs['interval'], tuple):
  171. every, period = kwargs['interval']
  172. schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period)
  173. else:
  174. schedule = kwargs['interval']
  175. periodic_task.interval = schedule
  176. if 'clocked_time' in kwargs:
  177. if isinstance(kwargs['clocked_time'], str):
  178. clocked_time = CommonService.get_date_from_timestamp(
  179. CommonService.convert_to_timestamp(0, kwargs['clocked_time']), 8)
  180. else:
  181. clocked_time = kwargs['clocked_time']
  182. schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
  183. periodic_task.clocked = schedule
  184. if 'crontab' in kwargs:
  185. if isinstance(kwargs['crontab'], tuple):
  186. minute, hour, day_of_week, day_of_month, month_of_year, timezone = kwargs['crontab']
  187. schedule, _ = CrontabSchedule.objects.get_or_create(
  188. minute=minute,
  189. hour=hour,
  190. day_of_week=day_of_week,
  191. day_of_month=day_of_month,
  192. month_of_year=month_of_year,
  193. timezone=timezone
  194. )
  195. else:
  196. schedule = kwargs['crontab']
  197. periodic_task.crontab = schedule
  198. periodic_task.save()