Browse Source

使用celery设置智能场景定时任务

locky 1 year ago
parent
commit
34cabd6874

+ 38 - 2
Controller/CeleryTasks/tasks.py

@@ -2,14 +2,19 @@
 # @File      : tasks.py
 # @Time      : 2024/3/12 14:23
 import django
+import requests
+
+from Ansjer.Config.gatewaySensorConfig import DEVICE_TYPE, SMART_SOCKET_TOPIC, EVENT_TYPE
+from Service.CommonService import CommonService
+
 django.setup()
 import time
 from Ansjer.celery import app
-from Ansjer.config import LOGGER
-from Ansjer.config import CONFIG_INFO
+from Ansjer.config import LOGGER, CONFIG_INFO, DETECT_PUSH_DOMAINS
 from Model.models import Device_User
 
 
+# ###################测试函数
 @app.task
 def hello():
     device_user_qs = Device_User.objects.filter(username='13138137872').values('NickName')
@@ -27,3 +32,34 @@ def test(arg):
 @app.task
 def add(x, y):
     print(x + y)
+# ###################
+
+
+@app.task
+def loocam_smart_scene(device_type, event_type, serial_number, scene_id=0):
+    """
+    loocam智能场景任务
+    @param device_type: 设备类型
+    @param event_type: 事件类型
+    @param serial_number: 序列号
+    @param scene_id: 场景id
+    @return:
+    """
+    if device_type == DEVICE_TYPE['socket']:
+        topic_name = SMART_SOCKET_TOPIC.format(serial_number)
+        status = 1 if event_type == EVENT_TYPE['socket_power_on'] else 0
+        msg = {
+            'type': 1,
+            'data': {'deviceSwitch': status}
+        }
+        CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
+
+    # 没有设备任务时,最后一个任务上报场景日志
+    if scene_id:
+        data = {
+            'sceneId': scene_id,
+            'status': 1
+        }
+        url = DETECT_PUSH_DOMAINS + 'gatewayService/sceneLogPush'
+        requests.post(url=url, data=data, timeout=8)
+

+ 14 - 16
Controller/SensorGateway/SmartSceneController.py

@@ -20,6 +20,7 @@ from Ansjer.Config.gatewaySensorConfig import SMART_SCENE_TOPIC, SENSOR_TYPE, EV
 from Model.models import FamilyRoomDevice, GatewaySubDevice, FamilyRoom, SmartScene, EffectiveTime, Device_Info, \
     SceneLog
 from Object.ApschedulerObject import ApschedulerObject
+from Object.CeleryBeatObject import CeleryBeatObj
 from Object.ResponseObject import ResponseObject
 from Service.CommonService import CommonService
 
@@ -1146,7 +1147,7 @@ class SmartSceneView(View):
                     # 无设备任务,且是最后一个任务,需要通过scene_id上报场景日志
                     if no_device_task and is_last_task:
                         smart_scene_id = scene_id
-                    task_temp['task_id'] = cls.create_aps_job(
+                    task_temp['task_id'] = cls.create_celery_task(
                         condition, minutes, delay_time, tz, repeat, sensor_type, event_type, serial_number,
                         smart_scene_id)
                 scene_task_list.append(task_temp)
@@ -1299,7 +1300,7 @@ class SmartSceneView(View):
             return False
 
     @classmethod
-    def create_aps_job(
+    def create_celery_task(
             cls, condition, minutes, delay_time, tz, repeat, device_type, event_type, serial_number, scene_id):
         """
         创建定时任务
@@ -1315,25 +1316,25 @@ class SmartSceneView(View):
         @param scene_id: 场景id
         @return: task_id
         """
-        task_id = serial_number + '_'
-        apscheduler_obj = ApschedulerObject(tz)
+        celery_beat_obj = CeleryBeatObj()
+        name = serial_number + '_'
+        task = 'Controller.CeleryTasks.tasks.loocam_smart_scene'
+        args = [device_type, event_type, serial_number, scene_id]
         # 一次性任务
         if repeat == 0:
             time_stamp = condition['time_dict']['time_stamp'] + delay_time
-            task_id += str(time_stamp)
-            apscheduler_obj.create_date_job(func=cls.pub_mqtt, task_id=task_id, time_stamp=time_stamp,
-                                            args=(device_type, event_type, serial_number, scene_id))
+            name += str(time_stamp)
+            celery_beat_obj.creat_clocked_task(name=name, task=task, time_stamp=time_stamp, args=args)
         # 周期任务
         else:
             hour, minute, second, is_next_day = cls.handle_delay_time(minutes, delay_time)
             # 加上延时,如果执行时间超过23:59,隔天执行
             weeks = cls.int_to_weeks(repeat, is_next_day)
             time_str = weeks + '_{:02d}{:02d}{:02d}'.format(hour, minute, second)
-            task_id += time_str
-            apscheduler_obj.create_cron_job(func=cls.pub_mqtt, task_id=task_id, day_of_week=weeks,
-                                            hour=hour, minute=minute,
-                                            args=(device_type, event_type, serial_number, scene_id))
-        return task_id
+            name += time_str
+            celery_beat_obj.creat_crontab_task(
+                timezone_offset=tz, name=name, task=task, minute=minute, hour=hour, day_of_week=weeks, args=args)
+        return name
 
     @staticmethod
     def handle_delay_time(minutes, delay_time):
@@ -1360,7 +1361,7 @@ class SmartSceneView(View):
     def int_to_weeks(repeat, is_next_day=False):
         """
         十进制转星期周期
-        @param repeat: 星期周期的十进制数,如127 -> 0,1,2,3,4,5,6
+        @param repeat: 星期周期的十进制数,如127 -> 1,2,3,4,5,6,7
         @param is_next_day: 是否隔天
         @return: weeks
         """
@@ -1371,10 +1372,7 @@ class SmartSceneView(View):
         next_day = 1 if is_next_day else 0
         for i, bit in enumerate(bin_str):
             if bit == '1':
-                # 7 -> 0
                 week = i + next_day
-                if week == 7:
-                    week = 0
                 weeks += str(week) + ','
         # 删除最后一个逗号并返回结果
         return weeks[:-1]

+ 7 - 4
Object/CeleryBeatObject.py

@@ -37,13 +37,14 @@ class CeleryBeatObj:
         PeriodicTask.objects.create(interval=schedule, name=name, task=task, args=args, kwargs=kwargs)
 
     @staticmethod
-    def creat_clocked_task(timezone_offset, time_string, name, task, args=None, kwargs=None):
+    def creat_clocked_task(name, task, time_stamp=0, timezone_offset=0, time_string='',  args=None, kwargs=None):
         """
         创建定时任务
-        @param timezone_offset: 时区偏移量
-        @param time_string: 时间字符串
         @param name: 任务名称
         @param task: 任务函数
+        @param time_stamp: 时间戳
+        @param timezone_offset: 时区偏移量
+        @param time_string: 时间字符串
         @param args: 参数
         @param kwargs: 参数
         @return:
@@ -55,7 +56,9 @@ class CeleryBeatObj:
             kwargs = {}
         kwargs = json.dumps(kwargs)
 
-        time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
+        # 不传时间戳时,将时间字符串转为时间戳
+        if time_stamp == 0:
+            time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
         clocked_time = CommonService.get_date_from_timestamp(time_stamp, timezone_offset)
         schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
         PeriodicTask.objects.create(clocked=schedule, one_off=True, name=name, task=task, args=args, kwargs=kwargs)

+ 3 - 1
requirements.txt

@@ -82,4 +82,6 @@ xmltodict==0.13.0
 gunicorn==20.1.0
 alibabacloud_green20220302==1.0.8
 django-apscheduler==0.6.2
-geoip2==4.7.0
+geoip2==4.7.0
+celery==5.3.6
+django-celery-beat==2.6.0