Эх сурвалжийг харах

修改暂停,恢复,删除任务,优化创建一次性任务函数

locky 1 жил өмнө
parent
commit
d9ad24b188

+ 3 - 0
Ansjer/Config/gatewaySensorConfig.py

@@ -5,6 +5,9 @@
 @File :gatewaySensorConfig.py
 """
 
+# 智能场景定时任务
+SMART_SCENE_TASK = 'Controller.CeleryTasks.tasks.loocam_smart_scene'
+
 # MQTT主题名
 SMART_SCENE_TOPIC = 'loocam/gateway_sensor/smart_scene/{}'
 GET_SCENE_TOPIC = 'loocam/gateway_sensor/get_scene/{}'

+ 3 - 0
Controller/CeleryTasks/tasks.py

@@ -2,6 +2,9 @@
 # @File      : tasks.py
 # @Time      : 2024/3/12 14:23
 # 解决models导入失败问题
+# *************
+# 修改任务函数后需要重启supervisor的celery和celery-beat才能失效
+# *************
 import django
 django.setup()
 import requests

+ 27 - 22
Controller/SensorGateway/SmartSceneController.py

@@ -16,10 +16,9 @@ from django.views import View
 from Ansjer.config import DETECT_PUSH_DOMAINS
 from Ansjer.Config.gatewaySensorConfig import SMART_SCENE_TOPIC, SENSOR_TYPE, EVENT_TYPE, SCENE_EVENT_CREATE, \
     SCENE_EVENT_EDIT, SCENE_EVENT_DELETE, SCENE_STATUS_ON, SCENE_STATUS_OFF, SCENE_EVENT_EDIT_STATUS, \
-    VOICE_AUDITION_TOPIC, SMART_SOCKET_TOPIC, DEVICE_TYPE
+    VOICE_AUDITION_TOPIC, SMART_SOCKET_TOPIC, DEVICE_TYPE, SMART_SCENE_TASK
 from Model.models import FamilyRoomDevice, GatewaySubDevice, FamilyRoom, SmartScene, EffectiveTime, Device_Info, \
     SceneLog
-from Object.ApschedulerObject import ApschedulerObject
 from Object.CeleryBeatObject import CeleryBeatObj
 from Object.ResponseObject import ResponseObject
 from Service.CommonService import CommonService
@@ -840,7 +839,7 @@ class SmartSceneView(View):
                     scene_data = smart_scene['scene_data']
                     if scene_data:
                         scene_data_dict = eval(scene_data)
-                        cls.del_aps_job(scene_data_dict)
+                        cls.del_celery_task(scene_data_dict)
 
                 smart_scene_qs.delete()
 
@@ -1111,7 +1110,7 @@ class SmartSceneView(View):
         @return: task_list, scene_data
         """
         # 删除旧的定时任务
-        cls.del_aps_job(scene_data_dict)
+        cls.del_celery_task(scene_data_dict)
         task_list = []
         scene_task_list = []
         # 不需要设备触发的任务
@@ -1318,7 +1317,6 @@ class SmartSceneView(View):
         """
         celery_beat_obj = CeleryBeatObj()
         name = serial_number + '_'
-        task = 'Controller.CeleryTasks.tasks.loocam_smart_scene'
         kwargs = {
             'device_type': device_type,
             'event_type': event_type,
@@ -1330,7 +1328,7 @@ class SmartSceneView(View):
             time_stamp = condition['time_dict']['time_stamp'] + delay_time
             name += str(time_stamp)
             celery_beat_obj.creat_clocked_task(
-                name=name, task=task, time_stamp=time_stamp, timezone_offset=tz, kwargs=kwargs)
+                name=name, task=SMART_SCENE_TASK, time_stamp=time_stamp, kwargs=kwargs)
         # 周期任务
         else:
             hour, minute, second, is_next_day = cls.handle_delay_time(minutes, delay_time)
@@ -1339,7 +1337,8 @@ class SmartSceneView(View):
             time_str = weeks + '_{:02d}{:02d}{:02d}'.format(hour, minute, second)
             name += time_str
             celery_beat_obj.creat_crontab_task(
-                timezone_offset=tz, name=name, task=task, minute=minute, hour=hour, day_of_week=weeks, kwargs=kwargs)
+                timezone_offset=tz, name=name, task=SMART_SCENE_TASK, minute=minute, hour=hour, day_of_week=weeks,
+                kwargs=kwargs)
         return name
 
     @staticmethod
@@ -1412,7 +1411,7 @@ class SmartSceneView(View):
             requests.post(url=url, data=data, timeout=8)
 
     @staticmethod
-    def del_aps_job(scene_data_dict):
+    def del_celery_task(scene_data_dict):
         """
         删除定时任务
         @param scene_data_dict: 场景数据
@@ -1420,13 +1419,13 @@ class SmartSceneView(View):
         """
         if scene_data_dict is not None:
             if scene_data_dict['condition'].get('time'):
-                apscheduler_obj = ApschedulerObject()
+                celery_beat_obj = CeleryBeatObj()
                 time_task_list = scene_data_dict['task_list']
                 for time_task in time_task_list:
                     # 存在任务则删除
                     task_id = time_task.get('task_id')
                     if task_id:
-                        apscheduler_obj.del_job(task_id)
+                        celery_beat_obj.del_task(task_id)
 
     @classmethod
     def pause_or_resume_job(cls, scene_data_dict, scene_status, scene_id):
@@ -1440,18 +1439,18 @@ class SmartSceneView(View):
         # 判断条件是否为设置时间
         time_type = scene_data_dict['condition'].get('time')
         if time_type:
-            apscheduler_obj = ApschedulerObject()
+            celery_beat_obj = CeleryBeatObj()
             # 关闭: 暂停任务, 打开: 恢复任务
             if scene_status == SCENE_STATUS_OFF:
-                cls.pause_time_job(apscheduler_obj, scene_data_dict)
+                cls.pause_time_job(celery_beat_obj, scene_data_dict)
             else:
-                return cls.resume_time_job(apscheduler_obj, time_type, scene_data_dict, scene_id)
+                return cls.resume_time_job(celery_beat_obj, time_type, scene_data_dict, scene_id)
 
     @staticmethod
-    def pause_time_job(apscheduler_obj, scene_data_dict):
+    def pause_time_job(celery_beat_obj, scene_data_dict):
         """
         暂停定时任务
-        @param apscheduler_obj: apscheduler对象
+        @param celery_beat_obj: celery_beat对象
         @param scene_data_dict: 场景数据
         @return:
         """
@@ -1460,15 +1459,15 @@ class SmartSceneView(View):
             task_id = time_task.get('task_id')
             if task_id:
                 try:
-                    apscheduler_obj.pause_job(task_id)
+                    celery_beat_obj.disable_task(task_id)
                 except Exception:
                     continue
 
     @classmethod
-    def resume_time_job(cls, apscheduler_obj, time_type, scene_data_dict, scene_id):
+    def resume_time_job(cls, celery_beat_obj, time_type, scene_data_dict, scene_id):
         """
         恢复定时任务
-        @param apscheduler_obj: apscheduler对象
+        @param celery_beat_obj: celery_beat对象
         @param time_type: 时间类型: date, cron
         @param scene_data_dict: 场景数据
         @param scene_id: 场景id
@@ -1480,7 +1479,7 @@ class SmartSceneView(View):
                 task_id = time_task.get('task_id')
                 if task_id:
                     try:
-                        apscheduler_obj.resume_job(task_id)
+                        celery_beat_obj.enable_task(task_id)
                     except Exception:
                         continue
             return None
@@ -1493,7 +1492,7 @@ class SmartSceneView(View):
                     task_id = time_task.get('task_id')
                     if task_id:
                         try:
-                            apscheduler_obj.resume_job(task_id)
+                            celery_beat_obj.enable_task(task_id)
                         except Exception:
                             continue
                 return None
@@ -1508,8 +1507,14 @@ class SmartSceneView(View):
                     task_id = serial_number + '_' + str(time_stamp)
                     # 最后一个任务使用传入的scene_id
                     smart_scene_id = scene_id if index == task_list_len-1 else 0
-                    apscheduler_obj.create_date_job(func=cls.pub_mqtt, task_id=task_id, time_stamp=time_stamp,
-                                                    args=(device_type, event_type, serial_number, smart_scene_id))
+                    kwargs = {
+                        'device_type': device_type,
+                        'event_type': event_type,
+                        'serial_number': serial_number,
+                        'scene_id': smart_scene_id
+                    }
+                    celery_beat_obj.creat_clocked_task(
+                        name=task_id, task=SMART_SCENE_TASK, time_stamp=time_stamp, kwargs=kwargs)
                     # 更新task_id
                     time_task['task_id'] = task_id
                 # 更新场景数据的时间戳

+ 2 - 1
Object/CeleryBeatObject.py

@@ -59,7 +59,8 @@ class CeleryBeatObj:
         # 不传时间戳时,将时间字符串转为时间戳
         if time_stamp == 0:
             time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
-        clocked_time = CommonService.get_date_from_timestamp(time_stamp, timezone_offset)
+        # 时间戳转为东八区的时间字符串
+        clocked_time = CommonService.get_date_from_timestamp(time_stamp, 8)
         schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
         PeriodicTask.objects.create(clocked=schedule, one_off=True, name=name, task=task, args=args, kwargs=kwargs)