IotCoreController.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import hashlib
  4. import logging
  5. import threading
  6. import time
  7. import uuid
  8. from collections import OrderedDict
  9. import requests
  10. from django.views import View
  11. from Ansjer.config import AWS_IOT_GETS3_PULL_CHINA_ID, AWS_IOT_GETS3_PULL_CHINA_SECRET, \
  12. AWS_IOT_GETS3_PULL_FOREIGN_ID, AWS_IOT_GETS3_PULL_FOREIGN_SECRET, AWS_ARN, AWS_IOT_SES_ACCESS_CHINA_REGION, \
  13. AWS_IOT_SES_ACCESS_FOREIGN_REGION_ASIA, AWS_IOT_SES_ACCESS_FOREIGN_REGION_EUROPE, \
  14. AWS_IOT_SES_ACCESS_FOREIGN_REGION_AMERICA, REGION_ID_LIST, ALEXA_DOMAIN
  15. from Model.models import Device_Info, iotdeviceInfoModel, SerialNumberModel, UidSetModel
  16. from Object.IOTCore.IotObject import IOTClient
  17. from Object.ResponseObject import ResponseObject
  18. from Object.TokenObject import TokenObject
  19. from Service.CommonService import CommonService
  20. class IotCoreView(View):
  21. def get(self, request, *args, **kwargs):
  22. request.encoding = 'utf-8'
  23. request_dict = request.GET
  24. operation = kwargs.get('operation', None)
  25. return self.validate(operation, request_dict, request)
  26. def post(self, request, *args, **kwargs):
  27. request.encoding = 'utf-8'
  28. request_dict = request.POST
  29. operation = kwargs.get('operation', None)
  30. return self.validate(operation, request_dict, request)
  31. def validate(self, operation, request_dict, request):
  32. response = ResponseObject()
  33. lang = request_dict.get('lang', 'en')
  34. response.lang = lang
  35. if operation == 'createKeysAndCertificate': # 设备注册到IoT core
  36. return self.create_key_and_certificate(request_dict, response)
  37. elif operation == 'requestPublishMessage':
  38. return self.request_publish_message(request_dict, response)
  39. elif operation == 'getS3PullKey':
  40. return self.get_s3_pull_key(request_dict, response, request)
  41. elif operation == 'thingRegroup': # OTA升级成功重新分组
  42. return self.thing_regroup(request_dict, response)
  43. elif operation == 'pcGetIotInfo':
  44. return self.pcGetIotInfo(request_dict, response)
  45. else:
  46. token = TokenObject(request_dict.get('token', None))
  47. if token.code != 0:
  48. return response.json(token.code)
  49. response.lang = token.lang
  50. if operation == 'clearIotCerm':
  51. return self.clear_Iot_Cerm(request_dict, response)
  52. elif operation == 'getIotInfo':
  53. return self.getIotInfo(request_dict, response)
  54. else:
  55. return response.json(404)
  56. # 设备注册到aws iot core
  57. @staticmethod
  58. def create_key_and_certificate(request_dict, response):
  59. logger = logging.getLogger('info')
  60. logger.info('设备注册到aws iot core请求参数:{}'.format(request_dict))
  61. token = request_dict.get('token', None)
  62. language = request_dict.get('language', None)
  63. time_stamp = request_dict.get('time_stamp', None)
  64. device_version = request_dict.get('device_version', None)
  65. if not all([token, language, time_stamp, device_version]):
  66. return response.json(444, {'param': 'token, language, time_stamp, device_version'})
  67. device_version = device_version.replace('.', '_') # 物品组命名不能包含'.'
  68. try:
  69. # 时间戳token校验
  70. no_rtc = request_dict.get('no_rtc', None)
  71. if no_rtc:
  72. if not CommonService.check_time_stamp_token_without_distance(token, time_stamp):
  73. return response.json(13)
  74. else:
  75. if not CommonService.check_time_stamp_token(token, time_stamp):
  76. return response.json(13)
  77. uid = request_dict.get('uid', '')
  78. uid_code = request_dict.get('uid_code', None)
  79. company_mark = '11A'
  80. if not uid: # 传序列号
  81. serial_number = request_dict.get('serial_number', None)
  82. serial_number_code = request_dict.get('serial_number_code', None)
  83. if not all([serial_number, serial_number_code]):
  84. return response.json(444, {'param': 'serial_number, serial_number_code'})
  85. # 序列号编码解码校验
  86. serial_number_code = CommonService.decode_data(serial_number_code)
  87. if serial_number != serial_number_code:
  88. return response.json(404)
  89. serial = serial_number[0:6]
  90. company_mark = serial_number[-3:]
  91. try:
  92. SerialNumberModel.objects.get(serial_number=serial)
  93. except:
  94. return response.json(173)
  95. thing_name_suffix = serial_number # 物品名后缀
  96. iot_device_info_qs = iotdeviceInfoModel.objects.filter(serial_number=serial)
  97. else: # 传uid
  98. # uid编码解码校验
  99. uid_code = CommonService.decode_data(uid_code)
  100. if uid != uid_code:
  101. return response.json(404)
  102. serial = '' # iot_deviceInfo表写入serial_number为''
  103. thing_name_suffix = uid # 物品名后缀
  104. iot_device_info_qs = iotdeviceInfoModel.objects.filter(uid=uid)
  105. # 判断设备是否已注册过
  106. if iot_device_info_qs.exists():
  107. iot = iot_device_info_qs[0]
  108. res = {
  109. 'certificateId': iot.certificate_id,
  110. 'certificatePem': iot.certificate_pem,
  111. 'publicKey': iot.public_key,
  112. 'privateKey': iot.private_key,
  113. 'endpoint': iot.endpoint
  114. }
  115. return response.json(0, {'res': res})
  116. else:
  117. # 获取并判断region_id是否有效
  118. region_id = CommonService.confirm_region_id()
  119. if region_id not in REGION_ID_LIST:
  120. return response.json(444, {'invalid region_id': region_id})
  121. iotClient = IOTClient(region_id)
  122. # 拼接物品名
  123. thingName = CommonService.get_thing_name(company_mark, thing_name_suffix)
  124. thingGroup = device_version + '_' + language
  125. res = iotClient.register_to_iot_core(thingName, thingGroup, response)
  126. token_iot_number = hashlib.md5((str(uuid.uuid1()) + str(int(time.time()))).encode('utf-8')).hexdigest()
  127. data = {
  128. 'uid': uid,
  129. 'serial_number': serial,
  130. 'endpoint': res[0]['endpoint'],
  131. 'certificate_id': res[0]['certificateId'],
  132. 'certificate_pem': res[0]['certificatePem'],
  133. 'public_key': res[0]['publicKey'],
  134. 'private_key': res[0]['privateKey'],
  135. 'thing_name': res[1]['ThingName'],
  136. 'thing_groups': res[1]['thingGroupName'],
  137. 'token_iot_number': token_iot_number
  138. }
  139. # 创建线程调用Alexa接口
  140. threading.Thread(target=call_alexa_api, kwargs=data).start()
  141. iotdeviceInfoModel.objects.create(**data)
  142. res = {
  143. 'certificateId': res[0]['certificateId'],
  144. 'certificatePem': res[0]['certificatePem'],
  145. 'publicKey': res[0]['publicKey'],
  146. 'privateKey': res[0]['privateKey'],
  147. 'endpoint': res[0]['endpoint']
  148. }
  149. return response.json(0, {'res': res})
  150. except Exception as e:
  151. print(e)
  152. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  153. @staticmethod
  154. def thing_regroup(request_dict, response):
  155. """
  156. OTA升级成功重新分组
  157. @param request_dict: 请求参数
  158. @request_dict uid: 设备uid
  159. @request_dict region_id: 地区id
  160. @request_dict time_stamp: 时间戳
  161. @request_dict time_stamp_token: 时间戳token
  162. @request_dict device_version: 设备版本
  163. @request_dict language: 版本语言
  164. @param response: 响应对象
  165. @return: response
  166. """
  167. uid = request_dict.get('uid', '')
  168. time_stamp = request_dict.get('time_stamp', None)
  169. time_stamp_token = request_dict.get('time_stamp_token', None)
  170. device_version = request_dict.get('device_version', None)
  171. language = request_dict.get('language', None)
  172. if not all([time_stamp, time_stamp_token, device_version, language]):
  173. return response.json(444)
  174. # 时间戳token校验
  175. no_rtc = request_dict.get('no_rtc', None)
  176. if no_rtc:
  177. if not CommonService.check_time_stamp_token_without_distance(time_stamp_token, time_stamp):
  178. return response.json(13)
  179. else:
  180. if not CommonService.check_time_stamp_token(time_stamp_token, time_stamp):
  181. return response.json(13)
  182. # 获取并判断region_id是否有效
  183. region_id = CommonService.confirm_region_id()
  184. if region_id not in REGION_ID_LIST:
  185. return response.json(444, {'invalid region_id': region_id})
  186. company_mark = '11A'
  187. thing_name_suffix = uid
  188. if not uid:
  189. # 使用序列号
  190. serial_number = request_dict.get('serial_number', None)
  191. if not serial_number:
  192. return response.json(444, {{'error param': 'uid and serial_number'}})
  193. company_mark = serial_number[-3:]
  194. thing_name_suffix = serial_number
  195. uid = CommonService.query_uid_with_serial(serial_number)
  196. uid_set_qs = UidSetModel.objects.filter(uid=uid)
  197. thingName = CommonService.get_thing_name(company_mark, thing_name_suffix)
  198. new_thingGroupName = (device_version + '_' + language).replace('.', '_') # 物品组命名不能包含'.'
  199. try:
  200. iotClient = IOTClient(int(region_id))
  201. # 获取旧物品组
  202. list_groups_res = iotClient.client.list_thing_groups_for_thing(thingName=thingName, maxResults=1)
  203. old_thingGroupName = list_groups_res['thingGroups'][0]['groupName']
  204. # 没有新物品组则创建
  205. list_thing_groups_res = iotClient.client.list_thing_groups(namePrefixFilter=new_thingGroupName
  206. , maxResults=1, recursive=False)
  207. if not list_thing_groups_res['thingGroups']:
  208. attributes = {
  209. "update_time": "0"
  210. }
  211. thingGroupProperties = {
  212. "thingGroupDescription": "OTA",
  213. "attributePayload": {
  214. "attributes": attributes,
  215. "merge": False # 更新时覆盖掉而不是合并
  216. }
  217. }
  218. iotClient.client.create_thing_group(thingGroupName=new_thingGroupName
  219. , thingGroupProperties=thingGroupProperties)
  220. iotClient.client.update_thing_groups_for_thing(thingName=thingName
  221. , thingGroupsToAdd=[new_thingGroupName]
  222. , thingGroupsToRemove=[old_thingGroupName])
  223. # 更新设备版本信息
  224. uid_set_qs.update(version=device_version)
  225. return response.json(0)
  226. except Exception as e:
  227. print(e)
  228. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  229. def clear_Iot_Cerm(self, request_dict, response):
  230. serial_number = request_dict.get('serial_number', None)
  231. if serial_number:
  232. iot = iotdeviceInfoModel.objects.filter(thing_name="Ansjer_Device_" + serial_number)
  233. if iot.exists():
  234. iot.delete()
  235. return response.json(0)
  236. else:
  237. return response.json(444)
  238. # Alexa请求IoT Core下发MQTT消息通知设备开始或停止推流,或唤醒设备
  239. @staticmethod
  240. def request_publish_message(request_dict, response):
  241. UID = request_dict.get('UID', None)
  242. rtsp = request_dict.get('rtsp', None)
  243. enable = request_dict.get('enable', '1')
  244. if not all([UID, rtsp]):
  245. return response.json(444)
  246. try:
  247. thing_name = CommonService.query_serial_with_uid(UID) # 存在序列号则为使用序列号作为物品名
  248. topic_name = 'ansjer/generic/{}'.format(thing_name)
  249. msg = OrderedDict(
  250. [
  251. ('alexaRtspCommand', rtsp),
  252. ('enable', int(enable)),
  253. ]
  254. )
  255. if not CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg):
  256. return response.json(10044)
  257. return response.json(0)
  258. except Exception as e:
  259. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  260. def get_s3_pull_key(self, request_dict, response, request):
  261. # 通用发布主题通知
  262. UID = request_dict.get('UID', None)
  263. if not all([UID]):
  264. return response.json(444)
  265. try:
  266. # 获取检查uid的序列号,如果没有序列号,不使用MQTT下发消息
  267. device_info_qs = Device_Info.objects.filter(UID=UID).values('UID', 'serial_number')
  268. if not device_info_qs.exists():
  269. return response.json(10043)
  270. uid = device_info_qs[0]['UID']
  271. serial_number = device_info_qs[0]['serial_number']
  272. # 如果device_info表的serial_number不为空,物品名为'Ansjer_Device_序列号'
  273. thing_name_suffix = serial_number if serial_number != '' else uid
  274. # 获取数据组织将要请求的url
  275. iot = iotdeviceInfoModel.objects.filter(thing_name__contains=thing_name_suffix).values('thing_name',
  276. 'endpoint',
  277. 'token_iot_number')
  278. if not iot.exists():
  279. return response.json(10043)
  280. endpoint = iot[0]['endpoint']
  281. MSG = self.get_s3_key_return_msg(endpoint)
  282. return response.json(0, MSG)
  283. except Exception as e:
  284. # print(e)
  285. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  286. def get_s3_key_return_msg(self, endpoint):
  287. MSG = {}
  288. if 'cn-northwest-1' in endpoint:
  289. key = AWS_IOT_GETS3_PULL_CHINA_ID
  290. secret = AWS_IOT_GETS3_PULL_CHINA_SECRET
  291. arn = AWS_ARN[0]
  292. region_name = AWS_IOT_SES_ACCESS_CHINA_REGION
  293. else:
  294. key = AWS_IOT_GETS3_PULL_FOREIGN_ID
  295. secret = AWS_IOT_GETS3_PULL_FOREIGN_SECRET
  296. arn = AWS_ARN[1]
  297. if 'ap-southeast-1' in endpoint:
  298. region_name = AWS_IOT_SES_ACCESS_FOREIGN_REGION_ASIA
  299. if 'eu-west-1' in endpoint:
  300. region_name = AWS_IOT_SES_ACCESS_FOREIGN_REGION_EUROPE
  301. if 'us-east-1' in endpoint:
  302. region_name = AWS_IOT_SES_ACCESS_FOREIGN_REGION_AMERICA
  303. MSG['AccessKeyId'] = key
  304. MSG['AccessKeySecret'] = secret
  305. MSG['bucket_name'] = 'asj-log'
  306. MSG['arn'] = arn
  307. MSG['region_name'] = region_name
  308. return MSG
  309. def getIotInfo(self, request_dict, response):
  310. # 获取IoT数据
  311. serial_number = request_dict.get('serial_number', None)
  312. uid = request_dict.get('uid', None)
  313. if not uid and not serial_number:
  314. return response.json(444)
  315. try:
  316. if serial_number:
  317. serial_number = serial_number[0:6]
  318. iot_info_qs = iotdeviceInfoModel.objects.filter(serial_number=serial_number). \
  319. values('endpoint', 'token_iot_number')
  320. else:
  321. iot_info_qs = iotdeviceInfoModel.objects.filter(uid=uid). \
  322. values('endpoint', 'token_iot_number')
  323. if not iot_info_qs.exists():
  324. return response.json(173)
  325. endpoint = iot_info_qs[0]['endpoint']
  326. token_iot_number = iot_info_qs[0]['token_iot_number']
  327. res = {'endpoint': endpoint, 'token_iot_number': token_iot_number}
  328. return response.json(0, res)
  329. except Exception as e:
  330. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  331. def pcGetIotInfo(self, request_dict, response):
  332. # PC工具获取IoT数据
  333. serial_number = request_dict.get('serial_number', None)
  334. uid = request_dict.get('uid', None)
  335. if not uid and not serial_number:
  336. return response.json(444)
  337. try:
  338. if serial_number:
  339. serial_number = serial_number[0:6]
  340. iot_info_qs = iotdeviceInfoModel.objects.filter(serial_number=serial_number). \
  341. values('endpoint', 'token_iot_number')
  342. else:
  343. iot_info_qs = iotdeviceInfoModel.objects.filter(uid=uid). \
  344. values('endpoint', 'token_iot_number')
  345. if not iot_info_qs.exists():
  346. return response.json(173)
  347. endpoint = iot_info_qs[0]['endpoint']
  348. token_iot_number = iot_info_qs[0]['token_iot_number']
  349. res = {'endpoint': endpoint, 'token_iot_number': token_iot_number}
  350. return response.json(0, res)
  351. except Exception as e:
  352. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  353. def call_alexa_api(**kwargs):
  354. url = ALEXA_DOMAIN + 'device/creatOrUpdateIotInfo'
  355. requests.post(url=url, data=kwargs, timeout=30)