Browse Source

celery增加update任务方法

zhangdongming 1 year ago
parent
commit
62881518e4

+ 14 - 1
AdminController/CloudServiceManage/AgentOrderController.py

@@ -17,7 +17,7 @@ from django.views import View
 from AgentModel.models import AgentDevice, AgentCloudServicePackage, AgentCustomerPackage, AgentDeviceOrder, \
     AgentDeviceOrderInstallment, AgentAccount
 from Ansjer.config import LOGGER
-from Model.models import Order_Model, Store_Meal, UnicomCombo
+from Model.models import Order_Model, Store_Meal, UnicomCombo, TimeZoneInfo
 from Object.CeleryBeatObject import CeleryBeatObj
 from Object.ResponseObject import ResponseObject
 from Object.TokenObject import TokenObject
@@ -69,6 +69,19 @@ class AgentOrderView(View):
         elif operation == 'delSettlementJob':
             self.del_settlement_job()
             return response.json(0)
+        elif operation == 'updateSettlementJob':
+            self.update_settlement_job()
+            return response.json(0)
+
+    @classmethod
+    def update_settlement_job(cls):
+        celery_beat_obj = CeleryBeatObj()
+        job_name = 'Agent-updateSettlement'
+        time_zone_info_qs = TimeZoneInfo.objects.filter(tz=8).values('zone_info')
+        if time_zone_info_qs.exists():
+            time_zone = time_zone_info_qs[0]['zone_info']
+            cron_tuple = ('*/3', '*', '*', '*', '*', time_zone)
+            celery_beat_obj.update_task(name=job_name, crontab=cron_tuple)
 
     @classmethod
     def add_settlement_job(cls):

+ 4 - 1
Controller/CeleryTasks/tasks.py

@@ -7,6 +7,7 @@
 # *************
 
 # 需要在setup之前导入celery app,否则缺少Django环境配置
+from AdminController.CloudServiceManage.AgentOrderController import AgentOrderView
 from Ansjer.celery import app
 import django
 
@@ -77,4 +78,6 @@ def loocam_smart_scene(device_type, event_type, serial_number, scene_id=0):
 
 @app.task
 def update_installment_settlement_order():
-    LOGGER.info('update_installment_settlement_order测试周期任务')
+    LOGGER.info('start周期结算代理商订单任务')
+    AgentOrderView.update_periodic_settlement()
+    LOGGER.info('end周期结算代理商订单任务')

+ 66 - 0
Object/CeleryBeatObject.py

@@ -132,3 +132,69 @@ class CeleryBeatObj:
         @return:
         """
         PeriodicTask.objects.get(name=name).delete()
+
+    @staticmethod
+    def update_task(name, **kwargs):
+        """
+        更新任务
+
+        @param name: 任务名称
+        @param kwargs: 需要更新的属性及对应的值(可选)
+            - interval: (every, period) 或 IntervalSchedule 对象,用于更新间隔任务
+            - clocked_time: str 或 datetime,用于更新定时任务的时间戳
+            - crontab: (minute, hour, day_of_week, day_of_month, month_of_year, timezone) 或 CrontabSchedule 对象,用于更新周期任务
+            - task: 新的任务函数名
+            - args: 新的参数列表
+            - kwargs: 新的关键字参数字典
+            - enabled: 是否启用任务(True 或 False)
+        @return:
+            None
+        """
+        periodic_task = PeriodicTask.objects.get(name=name)
+
+        # 更新通用属性
+        if 'task' in kwargs:
+            periodic_task.task = kwargs['task']
+        if 'args' in kwargs:
+            periodic_task.args = json.dumps(kwargs['args'])
+        if 'kwargs' in kwargs:
+            periodic_task.kwargs = json.dumps(kwargs['kwargs'])
+        if 'enabled' in kwargs:
+            periodic_task.enabled = kwargs['enabled']
+
+        # 更新不同类型任务的特定属性
+        if 'interval' in kwargs:
+            if isinstance(kwargs['interval'], tuple):
+                every, period = kwargs['interval']
+                schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period)
+            else:
+                schedule = kwargs['interval']
+            periodic_task.interval = schedule
+
+        if 'clocked_time' in kwargs:
+            if isinstance(kwargs['clocked_time'], str):
+                clocked_time = CommonService.get_date_from_timestamp(
+                    CommonService.convert_to_timestamp(0, kwargs['clocked_time']), 8)
+            else:
+                clocked_time = kwargs['clocked_time']
+            schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
+            periodic_task.clocked = schedule
+
+        if 'crontab' in kwargs:
+            if isinstance(kwargs['crontab'], tuple):
+                minute, hour, day_of_week, day_of_month, month_of_year, timezone = kwargs['crontab']
+                schedule, _ = CrontabSchedule.objects.get_or_create(
+                    minute=minute,
+                    hour=hour,
+                    day_of_week=day_of_week,
+                    day_of_month=day_of_month,
+                    month_of_year=month_of_year,
+                    timezone=timezone
+                )
+            else:
+                schedule = kwargs['crontab']
+            periodic_task.crontab = schedule
+
+        periodic_task.save()
+
+