# @Author : Rocky # @File : CeleryBeatObject.py # @Time : 2024/3/21 16:35 import json import zoneinfo from django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSchedule, CrontabSchedule from Model.models import TimeZoneInfo from Service.CommonService import CommonService from django.core.exceptions import ObjectDoesNotExist class CeleryBeatObj: # celery beat对象,封装定时任务函数 # https://github.com/celery/django-celery-beat @staticmethod def creat_interval_task(every, period, name, task, args=None, kwargs=None): """ 创建间隔任务 @param every: 时间间隔 @param period: 单位,seconds,minutes,hours,days(或IntervalSchedule.SECONDS...) @param name: 任务名称 @param task: 任务函数 @param args: 参数 @param kwargs: 参数 @return: """ if args is None: args = [] args = json.dumps(args) if kwargs is None: kwargs = {} kwargs = json.dumps(kwargs) schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period) PeriodicTask.objects.create(interval=schedule, name=name, task=task, args=args, kwargs=kwargs) @staticmethod def creat_clocked_task(name, task, time_stamp=0, timezone_offset=0, time_string='', args=None, kwargs=None): """ 创建定时任务 @param name: 任务名称 @param task: 任务函数 @param time_stamp: 时间戳 @param timezone_offset: 时区偏移量 @param time_string: 时间字符串 @param args: 参数 @param kwargs: 参数 @return: """ if args is None: args = [] args = json.dumps(args) if kwargs is None: kwargs = {} kwargs = json.dumps(kwargs) # 不传时间戳时,将时间字符串转为时间戳 if time_stamp == 0: time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string) # 时间戳转为东八区的时间字符串 clocked_time = CommonService.get_date_from_timestamp(time_stamp, 8) schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time) PeriodicTask.objects.create(clocked=schedule, one_off=True, name=name, task=task, args=args, kwargs=kwargs) @staticmethod def creat_crontab_task( timezone_offset, name, task, minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', args=None, kwargs=None): """ 创建周期任务 @param timezone_offset: 时区偏移量 @param name: 任务名称 @param task: 任务函数 @param minute: 分 @param hour: 时 @param day_of_week: 周,0-6对应周日到周六,也可写sun,mon,tue,wed,thu,fri,sat @param day_of_month: 月 @param month_of_year: 年 @param args: 参数 @param kwargs: 参数 @return: """ if args is None: args = [] args = json.dumps(args) if kwargs is None: kwargs = {} kwargs = json.dumps(kwargs) time_zone_info_qs = TimeZoneInfo.objects.filter(tz=timezone_offset).values('zone_info') if time_zone_info_qs.exists(): zone_info = time_zone_info_qs[0]['zone_info'] timezone = zoneinfo.ZoneInfo(zone_info) schedule, _ = CrontabSchedule.objects.get_or_create( minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month, month_of_year=month_of_year, timezone=timezone) PeriodicTask.objects.create(crontab=schedule, name=name, task=task, args=args, kwargs=kwargs) @staticmethod def disable_task(name): """ 暂停任务 @param name: 任务名称 @return: """ try: periodic_task = PeriodicTask.objects.get(name=name) periodic_task.enabled = False periodic_task.save() except ObjectDoesNotExist: pass @staticmethod def enable_task(name): """ 恢复任务 @param name: 任务名称 @return: """ try: periodic_task = PeriodicTask.objects.get(name=name) periodic_task.enabled = True periodic_task.save() except ObjectDoesNotExist: pass @staticmethod def del_task(name): """ 删除任务 @param name: 任务名称 @return: """ IntervalSchedule.objects.filter(periodictask__name=name).delete() CrontabSchedule.objects.filter(periodictask__name=name).delete() ClockedSchedule.objects.filter(periodictask__name=name).delete() PeriodicTask.objects.filter(name=name).delete() @staticmethod def update_task(name, **kwargs): """ 更新任务 @param name: 任务名称 @param kwargs: 需要更新的属性及对应的值(可选) - interval: (every, period) 或 IntervalSchedule 对象,用于更新间隔任务 - clocked_time: str 或 datetime,用于更新定时任务的时间戳 - crontab: (minute, hour, day_of_week, day_of_month, month_of_year, timezone) 或 CrontabSchedule 对象,用于更新周期任务 - task: 新的任务函数名 - args: 新的参数列表 - kwargs: 新的关键字参数字典 - enabled: 是否启用任务(True 或 False) @return: None """ periodic_task = PeriodicTask.objects.get(name=name) # 更新通用属性 if 'task' in kwargs: periodic_task.task = kwargs['task'] if 'args' in kwargs: periodic_task.args = json.dumps(kwargs['args']) if 'kwargs' in kwargs: periodic_task.kwargs = json.dumps(kwargs['kwargs']) if 'enabled' in kwargs: periodic_task.enabled = kwargs['enabled'] # 更新不同类型任务的特定属性 if 'interval' in kwargs: if isinstance(kwargs['interval'], tuple): every, period = kwargs['interval'] schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period) else: schedule = kwargs['interval'] periodic_task.interval = schedule if 'clocked_time' in kwargs: if isinstance(kwargs['clocked_time'], str): clocked_time = CommonService.get_date_from_timestamp( CommonService.convert_to_timestamp(0, kwargs['clocked_time']), 8) else: clocked_time = kwargs['clocked_time'] schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time) periodic_task.clocked = schedule if 'crontab' in kwargs: if isinstance(kwargs['crontab'], tuple): minute, hour, day_of_week, day_of_month, month_of_year, timezone = kwargs['crontab'] schedule, _ = CrontabSchedule.objects.get_or_create( minute=minute, hour=hour, day_of_week=day_of_week, day_of_month=day_of_month, month_of_year=month_of_year, timezone=timezone ) else: schedule = kwargs['crontab'] periodic_task.crontab = schedule periodic_task.save()