| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 | 
							- # @Author    : Rocky
 
- # @File      : CeleryBeatObject.py
 
- # @Time      : 2024/3/21 16:35
 
- import json
 
- import zoneinfo
 
- from django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSchedule, CrontabSchedule
 
- from Model.models import TimeZoneInfo
 
- from Service.CommonService import CommonService
 
- from django.core.exceptions import ObjectDoesNotExist
 
- class CeleryBeatObj:
 
-     # celery beat对象,封装定时任务函数
 
-     # https://github.com/celery/django-celery-beat
 
-     @staticmethod
 
-     def creat_interval_task(every, period, name, task, args=None, kwargs=None):
 
-         """
 
-         创建间隔任务
 
-         @param every: 时间间隔
 
-         @param period: 单位,seconds,minutes,hours,days(或IntervalSchedule.SECONDS...)
 
-         @param name: 任务名称
 
-         @param task: 任务函数
 
-         @param args: 参数
 
-         @param kwargs: 参数
 
-         @return:
 
-         """
 
-         if args is None:
 
-             args = []
 
-         args = json.dumps(args)
 
-         if kwargs is None:
 
-             kwargs = {}
 
-         kwargs = json.dumps(kwargs)
 
-         schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period)
 
-         PeriodicTask.objects.create(interval=schedule, name=name, task=task, args=args, kwargs=kwargs)
 
-     @staticmethod
 
-     def creat_clocked_task(name, task, time_stamp=0, timezone_offset=0, time_string='', args=None, kwargs=None):
 
-         """
 
-         创建定时任务
 
-         @param name: 任务名称
 
-         @param task: 任务函数
 
-         @param time_stamp: 时间戳
 
-         @param timezone_offset: 时区偏移量
 
-         @param time_string: 时间字符串
 
-         @param args: 参数
 
-         @param kwargs: 参数
 
-         @return:
 
-         """
 
-         if args is None:
 
-             args = []
 
-         args = json.dumps(args)
 
-         if kwargs is None:
 
-             kwargs = {}
 
-         kwargs = json.dumps(kwargs)
 
-         # 不传时间戳时,将时间字符串转为时间戳
 
-         if time_stamp == 0:
 
-             time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
 
-         # 时间戳转为东八区的时间字符串
 
-         clocked_time = CommonService.get_date_from_timestamp(time_stamp, 8)
 
-         schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
 
-         PeriodicTask.objects.create(clocked=schedule, one_off=True, name=name, task=task, args=args, kwargs=kwargs)
 
-     @staticmethod
 
-     def creat_crontab_task(
 
-             timezone_offset, name, task,
 
-             minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', args=None, kwargs=None):
 
-         """
 
-         创建周期任务
 
-         @param timezone_offset: 时区偏移量
 
-         @param name: 任务名称
 
-         @param task: 任务函数
 
-         @param minute: 分
 
-         @param hour: 时
 
-         @param day_of_week: 周,0-6对应周日到周六,也可写sun,mon,tue,wed,thu,fri,sat
 
-         @param day_of_month: 月
 
-         @param month_of_year: 年
 
-         @param args: 参数
 
-         @param kwargs: 参数
 
-         @return:
 
-         """
 
-         if args is None:
 
-             args = []
 
-         args = json.dumps(args)
 
-         if kwargs is None:
 
-             kwargs = {}
 
-         kwargs = json.dumps(kwargs)
 
-         time_zone_info_qs = TimeZoneInfo.objects.filter(tz=timezone_offset).values('zone_info')
 
-         if time_zone_info_qs.exists():
 
-             zone_info = time_zone_info_qs[0]['zone_info']
 
-             timezone = zoneinfo.ZoneInfo(zone_info)
 
-             schedule, _ = CrontabSchedule.objects.get_or_create(
 
-                 minute=minute,
 
-                 hour=hour,
 
-                 day_of_week=day_of_week,
 
-                 day_of_month=day_of_month,
 
-                 month_of_year=month_of_year,
 
-                 timezone=timezone)
 
-             PeriodicTask.objects.create(crontab=schedule, name=name, task=task, args=args, kwargs=kwargs)
 
-     @staticmethod
 
-     def disable_task(name):
 
-         """
 
-         暂停任务
 
-         @param name: 任务名称
 
-         @return:
 
-         """
 
-         try:
 
-             periodic_task = PeriodicTask.objects.get(name=name)
 
-             periodic_task.enabled = False
 
-             periodic_task.save()
 
-         except ObjectDoesNotExist:
 
-             pass
 
-     @staticmethod
 
-     def enable_task(name):
 
-         """
 
-         恢复任务
 
-         @param name: 任务名称
 
-         @return:
 
-         """
 
-         try:
 
-             periodic_task = PeriodicTask.objects.get(name=name)
 
-             periodic_task.enabled = True
 
-             periodic_task.save()
 
-         except ObjectDoesNotExist:
 
-             pass
 
-     @staticmethod
 
-     def del_task(name):
 
-         """
 
-         删除任务
 
-         @param name: 任务名称
 
-         @return:
 
-         """
 
-         IntervalSchedule.objects.filter(periodictask__name=name).delete()
 
-         CrontabSchedule.objects.filter(periodictask__name=name).delete()
 
-         ClockedSchedule.objects.filter(periodictask__name=name).delete()
 
-         PeriodicTask.objects.filter(name=name).delete()
 
-     @staticmethod
 
-     def update_task(name, **kwargs):
 
-         """
 
-         更新任务
 
-         @param name: 任务名称
 
-         @param kwargs: 需要更新的属性及对应的值(可选)
 
-             - interval: (every, period) 或 IntervalSchedule 对象,用于更新间隔任务
 
-             - clocked_time: str 或 datetime,用于更新定时任务的时间戳
 
-             - crontab: (minute, hour, day_of_week, day_of_month, month_of_year, timezone) 或 CrontabSchedule 对象,用于更新周期任务
 
-             - task: 新的任务函数名
 
-             - args: 新的参数列表
 
-             - kwargs: 新的关键字参数字典
 
-             - enabled: 是否启用任务(True 或 False)
 
-         @return:
 
-             None
 
-         """
 
-         periodic_task = PeriodicTask.objects.get(name=name)
 
-         # 更新通用属性
 
-         if 'task' in kwargs:
 
-             periodic_task.task = kwargs['task']
 
-         if 'args' in kwargs:
 
-             periodic_task.args = json.dumps(kwargs['args'])
 
-         if 'kwargs' in kwargs:
 
-             periodic_task.kwargs = json.dumps(kwargs['kwargs'])
 
-         if 'enabled' in kwargs:
 
-             periodic_task.enabled = kwargs['enabled']
 
-         # 更新不同类型任务的特定属性
 
-         if 'interval' in kwargs:
 
-             if isinstance(kwargs['interval'], tuple):
 
-                 every, period = kwargs['interval']
 
-                 schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period)
 
-             else:
 
-                 schedule = kwargs['interval']
 
-             periodic_task.interval = schedule
 
-         if 'clocked_time' in kwargs:
 
-             if isinstance(kwargs['clocked_time'], str):
 
-                 clocked_time = CommonService.get_date_from_timestamp(
 
-                     CommonService.convert_to_timestamp(0, kwargs['clocked_time']), 8)
 
-             else:
 
-                 clocked_time = kwargs['clocked_time']
 
-             schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
 
-             periodic_task.clocked = schedule
 
-         if 'crontab' in kwargs:
 
-             if isinstance(kwargs['crontab'], tuple):
 
-                 minute, hour, day_of_week, day_of_month, month_of_year, timezone = kwargs['crontab']
 
-                 schedule, _ = CrontabSchedule.objects.get_or_create(
 
-                     minute=minute,
 
-                     hour=hour,
 
-                     day_of_week=day_of_week,
 
-                     day_of_month=day_of_month,
 
-                     month_of_year=month_of_year,
 
-                     timezone=timezone
 
-                 )
 
-             else:
 
-                 schedule = kwargs['crontab']
 
-             periodic_task.crontab = schedule
 
-         periodic_task.save()
 
 
  |