123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- # @Author : Rocky
- # @File : CeleryBeatObject.py
- # @Time : 2024/3/21 16:35
- import json
- import zoneinfo
- from django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSchedule, CrontabSchedule
- from Model.models import TimeZoneInfo
- from Service.CommonService import CommonService
- class CeleryBeatObj:
- # celery beat对象,封装定时任务函数
- # https://github.com/celery/django-celery-beat
- @staticmethod
- def creat_interval_task(every, period, name, task, args=None, kwargs=None):
- """
- 创建间隔任务
- @param every: 时间间隔
- @param period: 单位,seconds,minutes,hours,days(或IntervalSchedule.SECONDS...)
- @param name: 任务名称
- @param task: 任务函数
- @param args: 参数
- @param kwargs: 参数
- @return:
- """
- if args is None:
- args = []
- args = json.dumps(args)
- if kwargs is None:
- kwargs = {}
- kwargs = json.dumps(kwargs)
- schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period)
- PeriodicTask.objects.create(interval=schedule, name=name, task=task, args=args, kwargs=kwargs)
- @staticmethod
- def creat_clocked_task(timezone_offset, time_string, name, task, args=None, kwargs=None):
- """
- 创建定时任务
- @param timezone_offset: 时区偏移量
- @param time_string: 时间字符串
- @param name: 任务名称
- @param task: 任务函数
- @param args: 参数
- @param kwargs: 参数
- @return:
- """
- if args is None:
- args = []
- args = json.dumps(args)
- if kwargs is None:
- kwargs = {}
- kwargs = json.dumps(kwargs)
- time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
- clocked_time = CommonService.get_date_from_timestamp(time_stamp, timezone_offset)
- schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
- PeriodicTask.objects.create(clocked=schedule, one_off=True, name=name, task=task, args=args, kwargs=kwargs)
- @staticmethod
- def creat_crontab_task(
- timezone_offset, name, task,
- minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', args=None, kwargs=None):
- """
- 创建周期任务
- @param timezone_offset: 时区偏移量
- @param name: 任务名称
- @param task: 任务函数
- @param minute: 分
- @param hour: 时
- @param day_of_week: 周,1-7对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun
- @param day_of_month: 月
- @param month_of_year: 年
- @param args: 参数
- @param kwargs: 参数
- @return:
- """
- if args is None:
- args = []
- args = json.dumps(args)
- if kwargs is None:
- kwargs = {}
- kwargs = json.dumps(kwargs)
- time_zone_info_qs = TimeZoneInfo.objects.filter(tz=timezone_offset).values('zone_info')
- if time_zone_info_qs.exists():
- zone_info = time_zone_info_qs[0]['zone_info']
- timezone = zoneinfo.ZoneInfo(zone_info)
- schedule, _ = CrontabSchedule.objects.get_or_create(
- minute=minute,
- hour=hour,
- day_of_week=day_of_week,
- day_of_month=day_of_month,
- month_of_year=month_of_year,
- timezone=timezone)
- PeriodicTask.objects.create(crontab=schedule, name=name, task=task, args=args, kwargs=kwargs)
- @staticmethod
- def disable_task(name):
- """
- 暂停任务
- @param name: 任务名称
- @return:
- """
- periodic_task = PeriodicTask.objects.get(name=name)
- periodic_task.enabled = False
- periodic_task.save()
- @staticmethod
- def enable_task(name):
- """
- 恢复任务
- @param name: 任务名称
- @return:
- """
- periodic_task = PeriodicTask.objects.get(name=name)
- periodic_task.enabled = True
- periodic_task.save()
- @staticmethod
- def del_task(name):
- """
- 删除任务
- @param name: 任务名称
- @return:
- """
- PeriodicTask.objects.get(name=name).delete()
|