#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import hashlib import json import time import uuid import boto3 import requests from django.views import View from Ansjer.config import BASE_DIR, AWS_IOT_GETS3_PULL_CHINA_ID, AWS_IOT_GETS3_PULL_CHINA_SECRET, \ AWS_IOT_GETS3_PULL_FOREIGN_ID, AWS_IOT_GETS3_PULL_FOREIGN_SECRET, AWS_ARN, AWS_IOT_SES_ACCESS_CHINA_REGION, \ AWS_IOT_SES_ACCESS_FOREIGN_REGION_ASIA, AWS_IOT_SES_ACCESS_FOREIGN_REGION_EUROPE, \ AWS_IOT_SES_ACCESS_FOREIGN_REGION_AMERICA, SERVER_TYPE from base64 import b64encode, encodebytes from Controller.DeviceConfirmRegion import Device_Region from Model.models import Device_User, Device_Info, iotdeviceInfoModel, UIDCompanySerialModel, \ SerialNumberModel from Object.IOTCore.IotObject import IOTClient from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Service.CommonService import CommonService import OpenSSL.crypto as ct class IotCoreView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET operation = kwargs.get('operation', None) return self.validate(operation, request_dict, request) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST operation = kwargs.get('operation', None) return self.validate(operation, request_dict, request) def validate(self, operation, request_dict, request): response = ResponseObject() lang = request_dict.get('lang', 'en') response.lang = lang if operation == 'createKeysAndCertificate': return self.create_keys_and_certificate(request_dict, response, request) elif operation == 'requestPublishMessage': return self.request_publish_message(request_dict, response, request) elif operation == 'getS3PullKey': return self.get_s3_pull_key(request_dict, response, request) elif operation == 'thingRegroup': return self.thing_regroup(request_dict, response, request) else: token = TokenObject(request_dict.get('token', None)) if token.code != 0: return response.json(token.code) response.lang = token.lang if operation == 'clearIotCerm': return self.clear_Iot_Cerm(request_dict, response) elif operation == 'getIotInfo': return self.getIotInfo(request_dict, response) else: return response.json(404) # CVM注册 :正使用 def create_keys_and_certificate(self, request_dict, response, request): uid = request_dict.get('uid', '') token = request_dict.get('token', None) uid_code = request_dict.get('uid_code', None) language = request_dict.get('language', None) time_stamp = request_dict.get('time_stamp', None) device_version = request_dict.get('device_version', None).replace('.', '_') # 物品组命名不能包含'.' if not all([token, time_stamp, device_version, language]): return response.json(444, {'param': 'token, uid_code, time_stamp, device_version, language'}) # 时间戳token校验 if not CommonService.check_time_stamp_token(token, time_stamp): return response.json(13) if not uid: # 使用序列号 serial_number = request_dict.get('serial_number', None) serial_number_code = request_dict.get('serial_number_code', None) if not all([serial_number, serial_number_code]): return response.json(444, {'param': 'serial_number, serial_number_code'}) # 序列号编码解码校验 serial_number_code = CommonService.decode_data(serial_number_code) if serial_number != serial_number_code: return response.json(404) serial = serial_number[0:6] try: SerialNumberModel.objects.get(serial_number=serial) except: return response.json(444) ThingNameSuffix = serial_number # 物品名后缀 iotdeviceInfo_qs = iotdeviceInfoModel.objects.filter(serial_number=serial) else: # 使用uid # uid编码解码校验 uid_code = CommonService.decode_data(uid_code) if uid != uid_code: return response.json(404) serial = '' # iot_deviceInfo表写入serial_number为'' ThingNameSuffix = uid # 物品名后缀 iotdeviceInfo_qs = iotdeviceInfoModel.objects.filter(uid=uid) # 判断设备是否已注册证书 if not iotdeviceInfo_qs.exists(): thingGroup = device_version + '_' + language # 设备模拟国外环境测试 # if SERVER_TYPE == 'Ansjer.us_config.formal_settings': # 国外正式配置使用固定ip进行测试 # ip = '67.220.90.13' # else: # ip = CommonService.get_ip_address(request) ip = CommonService.get_ip_address(request) region_id = Device_Region().get_device_region(ip) iotClient = IOTClient(region_id) res = iotClient.create_keys_and_certificate(ThingNameSuffix, thingGroup, response) token_iot_number = hashlib.md5((str(uuid.uuid1()) + str(int(time.time()))).encode('utf-8')).hexdigest() iotdeviceInfoModel.objects.create(uid=uid, serial_number=serial, endpoint=res[0]['endpoint'], certificate_id=res[0]['certificateId'], certificate_pem=res[0]['certificatePem'], public_key=res[0]['publicKey'], private_key=res[0]['privateKey'], thing_name=res[1]['ThingName'], thing_groups=res[1]['thingGroupName'], token_iot_number=token_iot_number ) res = { 'certificateId': res[0]['certificateId'], 'certificatePem': res[0]['certificatePem'], 'publicKey': res[0]['publicKey'], 'privateKey': res[0]['privateKey'], 'endpoint': res[0]['endpoint'] } return response.json(0, {'res': res}) else: iot = iotdeviceInfo_qs[0] res = { 'certificateId': iot.certificate_id, 'certificatePem': iot.certificate_pem, 'publicKey': iot.public_key, 'privateKey': iot.private_key, 'endpoint': iot.endpoint } # print('此设备已注册证书') return response.json(0, {'res': res}) def thing_regroup(self, request_dict, response, request): # 物品重新分组 uid = request_dict.get('uid', '') token = request_dict.get('token', None) language = request_dict.get('language', None) time_stamp = request_dict.get('time_stamp', None) device_version = request_dict.get('device_version', None) if not all([token, language, time_stamp, device_version]): return response.json(444, {'param: token, language, time_stamp, device_version'}) # 时间戳token校验 if not CommonService.check_time_stamp_token(token, time_stamp): return response.json(13) ip = CommonService.get_ip_address(request) region_id = Device_Region().get_device_region(ip) iotClient = IOTClient(region_id) ThingNameSuffix = uid if not uid: # 使用序列号 serial_number = request_dict.get('serial_number', None) if not serial_number: return response.json(444) ThingNameSuffix = serial_number thingName = 'Ansjer_Device_' + ThingNameSuffix new_thingGroupName = (device_version + '_' + language).replace('.', '_') # 物品组命名不能包含'.' try: # 获取旧物品组 list_groups_res = iotClient.client.list_thing_groups_for_thing(thingName=thingName, maxResults=1) old_thingGroupName = list_groups_res['thingGroups'][0]['groupName'] # 没有新物品组则创建 list_thing_groups_res = iotClient.client.list_thing_groups(namePrefixFilter=new_thingGroupName , maxResults=1, recursive=False) if not list_thing_groups_res['thingGroups']: attributes = { "update_time": "0" } thingGroupProperties = { "thingGroupDescription": "OTA", "attributePayload": { "attributes": attributes, "merge": False # 更新时覆盖掉而不是合并 } } iotClient.client.create_thing_group(thingGroupName=new_thingGroupName , thingGroupProperties=thingGroupProperties) iotClient.client.update_thing_groups_for_thing(thingName=thingName , thingGroupsToAdd=[new_thingGroupName] , thingGroupsToRemove=[old_thingGroupName]) # 更新设备版本信息 if uid: Device_Info.objects.filter(UID=uid).update(version=device_version) else: Device_Info.objects.filter(serial_number=serial_number).update(version=device_version) return response.json(0) except Exception as e: print(e) return response.json(500, repr(e)) def clear_Iot_Cerm(self, userID, request_dict, response): serial_number = request_dict.get('serial_number', None) if serial_number: iot = iotdeviceInfoModel.objects.filter(thing_name="Ansjer_Device_" + serial_number) if iot.exists(): iot.delete() return response.json(0) else: return response.json(444) def request_publish_message(self, request_dict, response, request): # Alexa请求IoT Core下发MQTT消息通知设备开始或停止推流,或唤醒设备 UID = request_dict.get('UID', None) MSG = request_dict.get('MSG', None) if not all([UID, MSG]): return response.json(444) try: # 获取设备的物品名后缀 device_info_qs = Device_Info.objects.filter(UID=UID).values('UID', 'serial_number') if not device_info_qs.exists(): return response.json(10043) uid = device_info_qs[0]['UID'] serial_number = device_info_qs[0]['serial_number'] # 如果device_info表的serial_number不为空,物品名为'Ansjer_Device_序列号' thing_name_suffix = serial_number if serial_number != '' else uid # 获取数据组织将要请求的url iot = iotdeviceInfoModel.objects.filter(thing_name__contains=thing_name_suffix).values('thing_name', 'endpoint', 'token_iot_number') if not iot.exists(): return response.json(10043) thing_name = iot[0]['thing_name'][14:] # IoT core上的物品名: Ansjer_Device_ + 序列号+企业编码/uid endpoint = iot[0]['endpoint'] Token = iot[0]['token_iot_number'] # Token = '297a601b3925e04daab5a60280650e09' topic_suffix = '_power_topic' if 'Turn' in MSG else '_rtsp_topic' topic_name = thing_name + topic_suffix # MQTT主题 # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1 # post请求url来发布MQTT消息 url = 'https://{}/topics/{}'.format(endpoint, topic_name) authorizer_name = 'Ansjer_Iot_Auth' signature = CommonService.rsa_sign(Token) # Token签名 headers = {'x-amz-customauthorizer-name': authorizer_name, 'Token': Token, 'x-amz-customauthorizer-signature': signature} params = {'command': MSG} r = requests.post(url=url, headers=headers, json=params, timeout=2) if r.status_code == 200: res = r.json() if res['message'] == 'OK': return response.json(0) return response.json(10044) else: # print('发布失败') return response.json(10044) except Exception as e: # print(e) return response.json(500, repr(e)) def get_s3_pull_key(self, request_dict, response, request): # 通用发布主题通知 UID = request_dict.get('UID', None) if not all([UID]): return response.json(444) try: # 获取检查uid的序列号,如果没有序列号,不使用MQTT下发消息 device_info_qs = Device_Info.objects.filter(UID=UID).values('UID', 'serial_number') if not device_info_qs.exists(): return response.json(10043) uid = device_info_qs[0]['UID'] serial_number = device_info_qs[0]['serial_number'] # 如果device_info表的serial_number不为空,物品名为'Ansjer_Device_序列号' thing_name_suffix = serial_number if serial_number != '' else uid # 获取数据组织将要请求的url iot = iotdeviceInfoModel.objects.filter(thing_name__contains=thing_name_suffix).values('thing_name', 'endpoint', 'token_iot_number') if not iot.exists(): return response.json(10043) endpoint = iot[0]['endpoint'] MSG = self.get_s3_key_return_msg(endpoint) return response.json(0,MSG) except Exception as e: # print(e) return response.json(500, repr(e)) def get_s3_key_return_msg(self,endpoint): MSG = {} if 'cn-northwest-1' in endpoint : key = AWS_IOT_GETS3_PULL_CHINA_ID secret = AWS_IOT_GETS3_PULL_CHINA_SECRET arn = AWS_ARN[0] region_name = AWS_IOT_SES_ACCESS_CHINA_REGION else: key = AWS_IOT_GETS3_PULL_FOREIGN_ID secret = AWS_IOT_GETS3_PULL_FOREIGN_SECRET arn = AWS_ARN[1] if 'ap-southeast-1' in endpoint : region_name = AWS_IOT_SES_ACCESS_FOREIGN_REGION_ASIA if 'eu-west-1' in endpoint : region_name = AWS_IOT_SES_ACCESS_FOREIGN_REGION_EUROPE if 'us-east-1' in endpoint : region_name = AWS_IOT_SES_ACCESS_FOREIGN_REGION_AMERICA MSG['AccessKeyId'] = key MSG['AccessKeySecret'] = secret MSG['bucket_name'] = 'asj-log' MSG['arn'] = arn MSG['region_name'] = region_name return MSG def getIotInfo(self, request_dict, response): # 获取IoT数据 serial_number = request_dict.get('serial_number', None) if not serial_number: return response.json(444) try: serial_number = serial_number[0:6] iot_info_qs = iotdeviceInfoModel.objects.filter(serial_number=serial_number).values('endpoint', 'token_iot_number') if not iot_info_qs.exists(): return response.json(173) endpoint = iot_info_qs[0]['endpoint'] token_iot_number = iot_info_qs[0]['token_iot_number'] res = {'endpoint': endpoint, 'token_iot_number': token_iot_number} return response.json(0, res) except Exception as e: return response.json(500, repr(e))