Explorar el Código

智能开关改用celery定时器

peng hace 1 año
padre
commit
0fef8c0741

+ 21 - 0
Controller/CeleryTasks/tasks.py

@@ -20,6 +20,8 @@ import time
 from Ansjer.config import LOGGER, CONFIG_INFO, DETECT_PUSH_DOMAINS
 from Model.models import Device_User
 from AdminController.CloudServiceManage.AgentOrderController import AgentOrderView
+import os
+import threading
 
 
 # ###################测试函数
@@ -81,3 +83,22 @@ def update_installment_settlement_order():
     LOGGER.info('start周期结算代理商订单任务')
     AgentOrderView.update_periodic_settlement()
     LOGGER.info('end周期结算代理商订单任务')
+
+
+@app.task
+def send_mqtt(serial_number, topic_name, msg, task_id):
+    """
+    定时发送mqtt, (不要随意更改,否则定时任务不执行)
+    @param serial_number: 设备序列号
+    @param topic_name: 主题
+    @param msg: 消息
+    @param task_id: 任务id
+    @return: response
+    """
+    now_time = int(time.time())
+    msg['implementTime'] = now_time
+    result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
+    LOGGER.info('定时发送mqtt结果:{},参数:{},{},{},{},{},线程:{},进程:{}'.format(result, serial_number, topic_name, msg,
+                                                                     now_time, task_id,
+                                                                     threading.get_ident(), os.getpid()))
+

+ 67 - 53
Controller/SensorGateway/SmartSwitchController.py

@@ -15,12 +15,13 @@ from Model.models import SwitchDimmingSettings, SwitchChronopher, Device_Info, S
     SwitchOperateLog
 from Object.RedisObject import RedisObject
 from Service.CommonService import CommonService
-from Object.ApschedulerObject import ApschedulerObject
+from Object.CeleryBeatObject import CeleryBeatObj
 from django.db import transaction
 from Ansjer.config import LOGGER
 
 APSCHEDULER_TOPIC_NAME = 'loocam/switch/time_scheduling/{}'  # 排程主题
 RESET_SWITCH_TOPIC_NAME = 'loocam/smart-switch/{}'  # 重置设备
+MQTT_TASK = 'Controller.CeleryTasks.tasks.send_mqtt'
 
 
 class SmartSwitchView(View):
@@ -51,6 +52,8 @@ class SmartSwitchView(View):
                 return self.get_chronopher_setting(request_dict, response)
             elif operation == 'add-or-edit-chronopher':  # 添加/编辑定时计划
                 return self.add_or_edit_chronopher(request_dict, response)
+            elif operation == 'edit-chronopher-status':  # 修改定时计划状态
+                return self.edit_chronopher_status(request_dict, response)
             elif operation == 'delete-chronopher':  # 删除定时计划
                 return self.delete_chronopher(request_dict, response)
             elif operation == 'edit-dimming-correction':  # 设置调光校正
@@ -223,7 +226,7 @@ class SmartSwitchView(View):
 
         if not all([device_id, repeat]):
             return response.json(444, {'param': 'deviceId,repeat'})
-        device_qs = Device_Info.objects.filter(id=device_id).values('serial_number')
+        device_qs = Device_Info.objects.filter(id=device_id).values('serial_number', 'userID')
         if not device_qs.exists():
             return response.json(173)
         if time_type_radio == 1:  # 时间点
@@ -254,25 +257,27 @@ class SmartSwitchView(View):
             return response.json(444, {'param': 'timeTypeRadio'})
         try:
             with transaction.atomic():
-                apscheduler_obj = ApschedulerObject()
+                celery_obj = CeleryBeatObj()
                 if is_edit:
                     if not chronopher_id:
-                        return response.json(444, {'param': 'timeTypeRadio'})
+                        return response.json(444, {'param': 'chronopherId'})
                     update_flag = SwitchChronopher.objects.filter(device_id=device_id, id=chronopher_id).update(
                         **chronopher_data)
                     if not update_flag:
                         return response.json(173)
-                    apscheduler_obj.del_job('switchchronopher_{}'.format(chronopher_id))
-                    apscheduler_obj.del_job('switchchronopher_{}_1'.format(chronopher_id))
-                    apscheduler_obj.del_job('switchchronopher_{}_2'.format(chronopher_id))
+                    celery_obj.del_task('switchchronopher_{}'.format(chronopher_id))
+                    celery_obj.del_task('switchchronopher_{}_1'.format(chronopher_id))
+                    celery_obj.del_task('switchchronopher_{}_2'.format(chronopher_id))
                 else:
                     switch_qs = SwitchChronopher.objects.create(**chronopher_data)
                     chronopher_id = switch_qs.id
 
                 # 设置定时任务
                 serial_number = device_qs[0]['serial_number']
+                user_id = device_qs[0]['userID']
+                tz = CommonService.get_user_tz(user_id)
                 topic_name = APSCHEDULER_TOPIC_NAME.format(serial_number)
-                if time_type_radio == 1:
+                if time_type_radio == 1:  # 时间点任务
                     task_id = 'switchchronopher_{}'.format(chronopher_id)
                     if time_point_device_will_doing in ['0', '1']:  # 开启或关闭
                         msg = {
@@ -288,9 +293,9 @@ class SmartSwitchView(View):
                             'slowTime': slow_open_or_close_speed
                         }
                     time_str = datetime.datetime.fromtimestamp(int(time_point))
-                    apscheduler_obj.create_cron_job(SmartSwitchView.send_mqtt, task_id, repeat, time_str.hour,
-                                                    time_str.minute, [serial_number, topic_name, msg, task_id])
-                else:
+                    celery_obj.creat_crontab_task(tz, task_id, MQTT_TASK, time_str.minute,
+                                                  time_str.hour, repeat, args=[serial_number, topic_name, msg, task_id])
+                else:  # 时间段任务
                     start_hour = int(time_quantum_start_time / 60 // 60)
                     start_minute = int(time_quantum_start_time / 60 % 60)
                     end_hour = int(time_quantum_end_time / 60 // 60)
@@ -300,12 +305,12 @@ class SmartSwitchView(View):
                         end_task_id = 'switchchronopher_{}_2'.format(chronopher_id)  # 结束任务id
                         msg = {"taskId": chronopher_id,
                                "deviceSwitch": int(time_quantum_device_will_doing)}
-                        apscheduler_obj.create_cron_job(SmartSwitchView.send_mqtt, begin_task_id, repeat, start_hour,
-                                                        start_minute, [serial_number, topic_name, msg, begin_task_id])
+                        celery_obj.creat_crontab_task(tz, begin_task_id, MQTT_TASK, start_minute, start_hour, repeat,
+                                                      args=[serial_number, topic_name, msg, begin_task_id])
                         msg = {"taskId": chronopher_id,
                                "deviceSwitch": 0 if int(time_quantum_device_will_doing) == 1 else 1}
-                        apscheduler_obj.create_cron_job(SmartSwitchView.send_mqtt, end_task_id, repeat, end_hour,
-                                                        end_minute, [serial_number, topic_name, msg, end_task_id])
+                        celery_obj.creat_crontab_task(tz, end_task_id, MQTT_TASK, end_minute, end_hour, repeat,
+                                                      args=[serial_number, topic_name, msg, end_task_id])
 
                     else:  # 间隔任务
                         minute = int(time_quantum_device_will_doing)
@@ -317,14 +322,46 @@ class SmartSwitchView(View):
                             minute = start_minute
                         else:
                             hour = '{}-{}'.format(start_hour, end_hour)
-                            minute = '{}/{}'.format(start_minute, minute)
-                        apscheduler_obj.create_cron_job(SmartSwitchView.send_mqtt, task_id, repeat, hour, minute,
-                                                        [serial_number, topic_name, msg, task_id])
+                            minute = '*/{}'.format(minute)
+                        celery_obj.creat_crontab_task(tz, task_id, MQTT_TASK, minute, hour, repeat,
+                                                      args=[serial_number, topic_name, msg, task_id])
                 return response.json(0)
         except Exception as e:
             print(e)
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
+    @staticmethod
+    def edit_chronopher_status(request_dict, response):
+        """
+        修改定时计划状态
+        @param request_dict: 请求参数
+        @request_dict deviceId: 设备id
+        @request_dict chronopherId: 定时计划id
+        @request_dict isExecute: 修改状态
+        @param response: 响应对象
+        @return: response
+        """
+        device_id = request_dict.get('deviceId', None)
+        chronopher_id = request_dict.get('chronopherId', None)
+        is_execute = request_dict.get('isExecute', None)
+        if not all([device_id, chronopher_id, is_execute]):
+            return response.json(444, {'param': 'deviceId,chronopherId,isExecute'})
+        try:
+            is_execute = int(is_execute)
+            celery_obj = CeleryBeatObj()
+            if is_execute:
+                celery_obj.enable_task('switchchronopher_{}'.format(chronopher_id))
+                celery_obj.enable_task('switchchronopher_{}_1'.format(chronopher_id))
+                celery_obj.enable_task('switchchronopher_{}_2'.format(chronopher_id))
+            else:
+                celery_obj.disable_task('switchchronopher_{}'.format(chronopher_id))
+                celery_obj.disable_task('switchchronopher_{}_1'.format(chronopher_id))
+                celery_obj.disable_task('switchchronopher_{}_2'.format(chronopher_id))
+            SwitchChronopher.objects.filter(device_id=device_id, id=chronopher_id).update(is_execute=is_execute)
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
     @staticmethod
     def delete_chronopher(request_dict, response):
         """
@@ -344,38 +381,15 @@ class SmartSwitchView(View):
             delete_flag = SwitchChronopher.objects.filter(device_id=device_id, id=chronopher_id).delete()
             if not delete_flag[0]:
                 return response.json(173)
-            apscheduler_obj = ApschedulerObject()
-            apscheduler_obj.del_job('switchchronopher_{}'.format(chronopher_id))  # 删除定时任务
-            apscheduler_obj.del_job('switchchronopher_{}_1'.format(chronopher_id))
-            apscheduler_obj.del_job('switchchronopher_{}_2'.format(chronopher_id))
+            celery_obj = CeleryBeatObj()
+            celery_obj.del_task('switchchronopher_{}'.format(chronopher_id))  # 删除定时任务
+            celery_obj.del_task('switchchronopher_{}_1'.format(chronopher_id))
+            celery_obj.del_task('switchchronopher_{}_2'.format(chronopher_id))
             return response.json(0)
         except Exception as e:
             print(e)
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
-    @staticmethod
-    def send_mqtt(serial_number, topic_name, msg, task_id):
-        """
-        定时发送mqtt, (不要随意更改,否则定时任务不执行)
-        @param serial_number: 设备序列号
-        @param topic_name: 主题
-        @param msg: 消息
-        @param task_id: 任务id
-        @return: response
-        """
-        now_time = int(time.time())
-        msg['implementTime'] = now_time
-        redis_obj = RedisObject()
-        is_lock = redis_obj.CONN.setnx(task_id + 'do_notify', 1)
-        redis_obj.CONN.expire(task_id + 'do_notify', 60)
-        if not is_lock:
-            return
-        result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
-        LOGGER.info('定时发送mqtt结果:{},参数:{},{},{},{},{},线程:{},进程:{}'.format(result, serial_number, topic_name, msg,
-                                                                         now_time, task_id,
-                                                                         threading.get_ident(), os.getpid()))
-        redis_obj.del_data(key=task_id + 'do_notify')
-
     @staticmethod
     def create_chronopher_log(request_dict, response):
         """
@@ -481,12 +495,12 @@ class SmartSwitchView(View):
             SwitchDimmingSettings.objects.filter(device_id=device_id).delete()
             chronopher_qs = SwitchChronopher.objects.filter(device_id=device_id)
             if chronopher_qs.exists():
-                apscheduler_obj = ApschedulerObject()
+                celery_obj = CeleryBeatObj()
                 for chronopher in chronopher_qs:
                     chronopher_id = chronopher.id
-                    apscheduler_obj.del_job('switchchronopher_{}'.format(chronopher_id))  # 删除定时任务
-                    apscheduler_obj.del_job('switchchronopher_{}_1'.format(chronopher_id))
-                    apscheduler_obj.del_job('switchchronopher_{}_2'.format(chronopher_id))
+                    celery_obj.del_task('switchchronopher_{}'.format(chronopher_id))  # 删除定时任务
+                    celery_obj.del_task('switchchronopher_{}_1'.format(chronopher_id))
+                    celery_obj.del_task('switchchronopher_{}_2'.format(chronopher_id))
                 chronopher_qs.delete()
             SceneLog.objects.filter(device_id=device_id).delete()
             FamilyRoomDevice.objects.filter(device_id=device_id).delete()
@@ -507,12 +521,12 @@ class SmartSwitchView(View):
             SwitchDimmingSettings.objects.filter(device_id=device_id).delete()
             chronopher_qs = SwitchChronopher.objects.filter(device_id=device_id)
             if chronopher_qs.exists():
-                apscheduler_obj = ApschedulerObject()
+                celery_obj = CeleryBeatObj()
                 for chronopher in chronopher_qs:
                     chronopher_id = chronopher.id
-                    apscheduler_obj.del_job('switchchronopher_{}'.format(chronopher_id))  # 删除定时任务
-                    apscheduler_obj.del_job('switchchronopher_{}_1'.format(chronopher_id))
-                    apscheduler_obj.del_job('switchchronopher_{}_2'.format(chronopher_id))
+                    celery_obj.del_task('switchchronopher_{}'.format(chronopher_id))  # 删除定时任务
+                    celery_obj.del_task('switchchronopher_{}_1'.format(chronopher_id))
+                    celery_obj.del_task('switchchronopher_{}_2'.format(chronopher_id))
                 chronopher_qs.delete()
             SceneLog.objects.filter(device_id=device_id).delete()
             msg = {

+ 1 - 1
Model/models.py

@@ -4533,7 +4533,7 @@ class SwitchChronopher(models.Model):
     time_quantum_device_will_doing = models.SmallIntegerField(default=0,
                                                               verbose_name='设备将会')  # 0: 开启, 1: 关闭, x: 开启/关闭切换间隔(分钟)
     slow_open_or_close_speed = models.SmallIntegerField(default=0, verbose_name='缓慢开/关速度')  # 秒
-    repeat = models.CharField(default=0, max_length=13, verbose_name=u'重复周期')  # 1-7:星期一到星期日
+    repeat = models.CharField(default=0, max_length=13, verbose_name=u'重复周期')  # 0-6:星期日到星期六
     is_execute = models.SmallIntegerField(default=1, verbose_name='是否执行')  # 0:执行;1:不执行
 
     class Meta:

+ 18 - 10
Object/CeleryBeatObject.py

@@ -8,6 +8,7 @@ from django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSch
 
 from Model.models import TimeZoneInfo
 from Service.CommonService import CommonService
+from django.core.exceptions import ObjectDoesNotExist
 
 
 class CeleryBeatObj:
@@ -37,7 +38,7 @@ class CeleryBeatObj:
         PeriodicTask.objects.create(interval=schedule, name=name, task=task, args=args, kwargs=kwargs)
 
     @staticmethod
-    def creat_clocked_task(name, task, time_stamp=0, timezone_offset=0, time_string='',  args=None, kwargs=None):
+    def creat_clocked_task(name, task, time_stamp=0, timezone_offset=0, time_string='', args=None, kwargs=None):
         """
         创建定时任务
         @param name: 任务名称
@@ -109,9 +110,12 @@ class CeleryBeatObj:
         @param name: 任务名称
         @return:
         """
-        periodic_task = PeriodicTask.objects.get(name=name)
-        periodic_task.enabled = False
-        periodic_task.save()
+        try:
+            periodic_task = PeriodicTask.objects.get(name=name)
+            periodic_task.enabled = False
+            periodic_task.save()
+        except ObjectDoesNotExist:
+            pass
 
     @staticmethod
     def enable_task(name):
@@ -120,9 +124,12 @@ class CeleryBeatObj:
         @param name: 任务名称
         @return:
         """
-        periodic_task = PeriodicTask.objects.get(name=name)
-        periodic_task.enabled = True
-        periodic_task.save()
+        try:
+            periodic_task = PeriodicTask.objects.get(name=name)
+            periodic_task.enabled = True
+            periodic_task.save()
+        except ObjectDoesNotExist:
+            pass
 
     @staticmethod
     def del_task(name):
@@ -131,7 +138,10 @@ class CeleryBeatObj:
         @param name: 任务名称
         @return:
         """
-        PeriodicTask.objects.get(name=name).delete()
+        IntervalSchedule.objects.filter(periodictask__name=name).delete()
+        CrontabSchedule.objects.filter(periodictask__name=name).delete()
+        ClockedSchedule.objects.filter(periodictask__name=name).delete()
+        PeriodicTask.objects.filter(name=name).delete()
 
     @staticmethod
     def update_task(name, **kwargs):
@@ -196,5 +206,3 @@ class CeleryBeatObj:
             periodic_task.crontab = schedule
 
         periodic_task.save()
-
-