瀏覽代碼

请求mqtt发布消息封装成通用方法,修改消息推送间隔通过mqtt通知设备

locky 3 年之前
父節點
當前提交
b868f96ec1
共有 3 個文件被更改,包括 88 次插入89 次删除
  1. 4 44
      Controller/AiController.py
  2. 46 45
      Controller/DetectControllerV2.py
  3. 38 0
      Service/CommonService.py

+ 4 - 44
Controller/AiController.py

@@ -209,13 +209,9 @@ class AiView(View):
             topic_name = 'ansjer/generic/{}'.format(uid)
             if status == 0:     # 关闭
                 hasAiService.update(**qs_data)
-                # UidPushModel.objects.filter(uid_set__uid=uid).delete()
-                # 状态为0的时候删除redis缓存数据
-                # self.do_delete_redis(uid)
-
                 # mqtt通知设备关闭AI识别功能
-                msg = {'commandType': 'AIDisable'},
-                req_success = self.requestPublishMqtt(uid, topic_name, msg)
+                msg = {'commandType': 'AIDisable'}
+                req_success = CommonService.req_publish_mqtt_msg(uid, topic_name, msg)
                 if not req_success:
                     return response.json(10044)
                 return response.json(0)
@@ -276,7 +272,7 @@ class AiView(View):
                               'aiIdentificationUrl': aiIdentificationUrl,
                           }
                       },
-                req_success = self.requestPublishMqtt(uid, topic_name, msg)
+                req_success = CommonService.req_publish_mqtt_msg(uid, topic_name, msg)
                 if not req_success:
                     return response.json(10044)
                 return response.json(0, {'aiIdentificationUrl': aiIdentificationUrl, 'endTime': endTime, 'etk': etk})
@@ -291,7 +287,7 @@ class AiView(View):
         try:
             ai_server_qs = AiService.objects.filter(uid=uid).values('detect_status', 'detect_group')
             if not ai_server_qs.exists():
-                return response,json(173)
+                return response.json(173)
             res = {
                 'detect_status': ai_server_qs[0]['detect_status'],
                 'detect_group': ai_server_qs[0]['detect_group'],
@@ -300,42 +296,6 @@ class AiView(View):
         except Exception as e:
             return response.json(500, repr(e))
 
-    def requestPublishMqtt(self, thing_name, topic_name, msg):
-        # 通用发布MQTT主题通知
-        if not all([msg, thing_name, topic_name]):
-            return False
-
-        try:
-            # 获取数据组织将要请求的url
-            iot = iotdeviceInfoModel.objects.filter(
-                thing_name__icontains=thing_name).values(
-                'endpoint', 'token_iot_number')
-            if not iot.exists():
-                return False
-            endpoint = iot[0]['endpoint']
-            Token = iot[0]['token_iot_number']
-
-            # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html
-            # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1
-            # post请求url发布MQTT消息
-            url = 'https://{}/topics/{}'.format(endpoint, topic_name)
-            authorizer_name = 'Ansjer_Iot_Auth'
-            signature = CommonService.rsa_sign(Token)  # Token签名
-            headers = {
-                'x-amz-customauthorizer-name': authorizer_name,
-                'Token': Token,
-                'x-amz-customauthorizer-signature': signature}
-            r = requests.post(url=url, headers=headers, json=msg, timeout=2)
-            if r.status_code == 200:
-                res = r.json()
-                if res['message'] == 'OK':
-                    return True
-                return False
-            else:
-                return False
-        except Exception as e:
-            return False
-
     def do_commodity_list(self, userID, request_dict, response):
         uid = request_dict.get('uid', None)
         lang = request_dict.get('lang', 'en')

+ 46 - 45
Controller/DetectControllerV2.py

@@ -25,7 +25,8 @@ from Ansjer.config import DETECT_PUSH_DOMAIN, DETECT_PUSH_DOMAINS, DETECT_PUSH_D
                             SERVER_DOMAIN, SERVER_DOMAIN_SSL, \
                             OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, \
                             JPUSH_CONFIG, FCM_CONFIG, APNS_CONFIG, BASE_DIR, APNS_MODE,  SERVER_TYPE
-from Model.models import Device_Info, VodHlsModel, Equipment_Info, UidSetModel, UidPushModel, CompanyModel, SysMsgModel
+from Model.models import Device_Info, VodHlsModel, Equipment_Info, UidSetModel, UidPushModel, CompanyModel, SysMsgModel, \
+    AiService
 from Object.ETkObject import ETkObject
 from Object.RedisObject import RedisObject
 from Object.ResponseObject import ResponseObject
@@ -127,54 +128,54 @@ class DetectControllerViewV2(View):
                     return response.json(904)
             else:
                 return response.json(173)
-            dvqs = Device_Info.objects.filter(userID_id=userID, UID=uid)
-            # 获取用户区域
-            # ip = self.ip
-            # ipInfo = CommonService.getIpIpInfo(ip=ip, lang='EN')
-            # area = ipInfo['country_name']
-            # if area == 'China':
-            #     DETECT_PUSH_DOMAIN_V2 = 'cn.push.dvema.com'
-            # else:
-            #     DETECT_PUSH_DOMAIN_V2 = 'en.push.dvema.com'
 
-            nowTime = int(time.time())
-            if not dvqs.exists():
+            # 判断用户是否拥有设备
+            device_info_qs = Device_Info.objects.filter(userID_id=userID, UID=uid)
+            if not device_info_qs.exists():
                 return response.json(14)
-            # 修改状态
+
+            # 更新或创建uid_set数据
+            nowTime = int(time.time())
+            uid_set_data = {}
+
+            # 设置开关状态,0:关闭,1:开启
             if status:
-                dvqs.update(NotificationMode=int(status))
-            uid_set_qs = UidSetModel.objects.filter(uid=uid)
-            # uid配置信息是否存在
+                status = int(status)
+                uid_set_data['detect_status'] = status
+                device_info_qs.update(NotificationMode=status)
+
+            # 检测类型
+            if detect_group:
+                uid_set_data['detect_group'] = detect_group
+
+            # 设置消息推送间隔
+            if interval:
+                interval = int(interval)
+                uid_set_data['detect_interval'] = interval
+                # 开通了ai服务的设备,通过mqtt通知设备修改消息推送间隔
+                ai_service_qs = AiService.objects.filter(uid=uid, use_status=1, endTime__lte=nowTime)
+                if ai_service_qs.exists():
+                    topic_name = 'ansjer/generic/{}'.format(uid)
+                    msg = {
+                        'commandType': 'AIState',
+                        'payload': {
+                            'IntervalTime': interval
+                        }
+                    }
+                    req_success = CommonService.req_publish_mqtt_msg(uid, topic_name, msg)
+                    if not req_success:
+                        return response.json(10044)
 
+            uid_set_qs = UidSetModel.objects.filter(uid=uid)
             if uid_set_qs.exists():
                 uid_set_id = uid_set_qs[0].id
-                qs_data = {
-                    'updTime': nowTime,
-                }
-                if status:
-                    qs_data['detect_status'] = int(status)
-                if interval:
-                    qs_data['detect_interval'] = int(interval)
-                if detect_group:
-                    qs_data['detect_group'] = detect_group
-                print(qs_data)
-                uid_set_qs.update(**qs_data)
-
+                uid_set_data['updTime'] = nowTime
+                uid_set_qs.update(**uid_set_data)
             else:
-                qs_data = {
-                    'uid': uid,
-                    'addTime': nowTime,
-                    'updTime': nowTime,
-                }
-                if status:
-                    qs_data['detect_status'] = int(status)
-                if interval:
-                    qs_data['detect_interval'] = int(interval)
-                if detect_group:
-                    qs_data['detect_group'] = detect_group
-                # 添加设备配置
-                uid_set_qs = UidSetModel.objects.create(**qs_data)
-
+                uid_set_data['uid'] = uid
+                uid_set_data['addTime'] = nowTime
+                uid_set_data['updTime'] = nowTime
+                uid_set_qs = UidSetModel.objects.create(**uid_set_data)
                 uid_set_id = uid_set_qs.id
 
             # 初始化UidPushModel推送表
@@ -196,11 +197,11 @@ class DetectControllerViewV2(View):
                 UidPushModel.objects.create(**uid_push_create_dict)
                 return response.json(0)
 
-            if status == '0':
+            if status == 0:
                 # 状态为0的时候删除redis缓存数据
                 self.do_delete_redis(uid)
                 return response.json(0)
-            elif status == '1':
+            elif status == 1:
                 uid_push_qs = UidPushModel.objects.filter(userID_id=userID, m_code=m_code, uid_set__uid=uid)
 
                 if uid_push_qs.exists():
@@ -233,7 +234,7 @@ class DetectControllerViewV2(View):
                     UidPushModel.objects.create(**uid_push_create_dict)
 
                 if interval:
-                    self.do_delete_redis(uid, int(interval))
+                    self.do_delete_redis(uid, interval)
                 else:
                     self.do_delete_redis(uid)
                 # utko = UidTokenObject()

+ 38 - 0
Service/CommonService.py

@@ -6,6 +6,7 @@ import time
 from pathlib import Path
 from random import Random
 import ipdb
+import requests
 import simplejson as json
 from django.core import serializers
 from django.http import HttpResponseRedirect
@@ -15,6 +16,7 @@ from Ansjer.config import BASE_DIR, UNICODE_ASCII_CHARACTER_SET, SERVER_DOMAIN_S
 import OpenSSL.crypto as ct
 from base64 import encodebytes
 from Controller.CheckUserData import RandomStr
+from Model.models import iotdeviceInfoModel
 from Service.ModelService import ModelService
 
 
@@ -452,6 +454,42 @@ class CommonService:
             print(e)
             return False
 
+    def req_publish_mqtt_msg(self, thing_name, topic_name, msg):
+        # 通用发布MQTT消息函数
+        if not all([thing_name, topic_name, msg]):
+            return False
+
+        try:
+            # 获取数据组织将要请求的url
+            iot = iotdeviceInfoModel.objects.filter(
+                thing_name__icontains=thing_name).values(
+                'endpoint', 'token_iot_number')
+            if not iot.exists():
+                return False
+            endpoint = iot[0]['endpoint']
+            Token = iot[0]['token_iot_number']
+
+            # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html
+            # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1
+            # post请求url发布MQTT消息
+            url = 'https://{}/topics/{}'.format(endpoint, topic_name)
+            authorizer_name = 'Ansjer_Iot_Auth'
+            signature = self.rsa_sign(Token)  # Token签名
+            headers = {
+                'x-amz-customauthorizer-name': authorizer_name,
+                'Token': Token,
+                'x-amz-customauthorizer-signature': signature}
+            r = requests.post(url=url, headers=headers, json=msg, timeout=2)
+            if r.status_code == 200:
+                res = r.json()
+                if res['message'] == 'OK':
+                    return True
+                return False
+            else:
+                return False
+        except Exception as e:
+            return False
+
     @staticmethod
     def rsa_sign(Token):
         # 私钥签名Token