| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 | # @Author    : Rocky# @File      : CeleryBeatObject.py# @Time      : 2024/3/21 16:35import jsonimport zoneinfofrom django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSchedule, CrontabSchedulefrom Model.models import TimeZoneInfofrom Service.CommonService import CommonServiceclass CeleryBeatObj:    # celery beat对象,封装定时任务函数    # https://github.com/celery/django-celery-beat    @staticmethod    def creat_interval_task(every, period, name, task, args=None, kwargs=None):        """        创建间隔任务        @param every: 时间间隔        @param period: 单位,seconds,minutes,hours,days(或IntervalSchedule.SECONDS...)        @param name: 任务名称        @param task: 任务函数        @param args: 参数        @param kwargs: 参数        @return:        """        if args is None:            args = []        args = json.dumps(args)        if kwargs is None:            kwargs = {}        kwargs = json.dumps(kwargs)        schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period)        PeriodicTask.objects.create(interval=schedule, name=name, task=task, args=args, kwargs=kwargs)    @staticmethod    def creat_clocked_task(name, task, time_stamp=0, timezone_offset=0, time_string='',  args=None, kwargs=None):        """        创建定时任务        @param name: 任务名称        @param task: 任务函数        @param time_stamp: 时间戳        @param timezone_offset: 时区偏移量        @param time_string: 时间字符串        @param args: 参数        @param kwargs: 参数        @return:        """        if args is None:            args = []        args = json.dumps(args)        if kwargs is None:            kwargs = {}        kwargs = json.dumps(kwargs)        # 不传时间戳时,将时间字符串转为时间戳        if time_stamp == 0:            time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)        # 时间戳转为东八区的时间字符串        clocked_time = CommonService.get_date_from_timestamp(time_stamp, 8)        schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)        PeriodicTask.objects.create(clocked=schedule, one_off=True, name=name, task=task, args=args, kwargs=kwargs)    @staticmethod    def creat_crontab_task(            timezone_offset, name, task,            minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', args=None, kwargs=None):        """        创建周期任务        @param timezone_offset: 时区偏移量        @param name: 任务名称        @param task: 任务函数        @param minute: 分        @param hour: 时        @param day_of_week: 周,1-7对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun        @param day_of_month: 月        @param month_of_year: 年        @param args: 参数        @param kwargs: 参数        @return:        """        if args is None:            args = []        args = json.dumps(args)        if kwargs is None:            kwargs = {}        kwargs = json.dumps(kwargs)        time_zone_info_qs = TimeZoneInfo.objects.filter(tz=timezone_offset).values('zone_info')        if time_zone_info_qs.exists():            zone_info = time_zone_info_qs[0]['zone_info']            timezone = zoneinfo.ZoneInfo(zone_info)            schedule, _ = CrontabSchedule.objects.get_or_create(                minute=minute,                hour=hour,                day_of_week=day_of_week,                day_of_month=day_of_month,                month_of_year=month_of_year,                timezone=timezone)            PeriodicTask.objects.create(crontab=schedule, name=name, task=task, args=args, kwargs=kwargs)    @staticmethod    def disable_task(name):        """        暂停任务        @param name: 任务名称        @return:        """        periodic_task = PeriodicTask.objects.get(name=name)        periodic_task.enabled = False        periodic_task.save()    @staticmethod    def enable_task(name):        """        恢复任务        @param name: 任务名称        @return:        """        periodic_task = PeriodicTask.objects.get(name=name)        periodic_task.enabled = True        periodic_task.save()    @staticmethod    def del_task(name):        """        删除任务        @param name: 任务名称        @return:        """        PeriodicTask.objects.get(name=name).delete()
 |