1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- # -*- coding: utf-8 -*-
- """
- @Time : 2022/4/26 9:01
- @Auth : Locky
- @File :SensorGatewayController.py
- @IDE :PyCharm
- """
- import random
- import string
- from django.views import View
- from Object.ResponseObject import ResponseObject
- from Service.CommonService import CommonService
- class SensorGateway(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'getSensorId': # 返回唯一标识id给设备
- return self.getSensorId(request_dict, response)
- elif operation == 'registerToAWSIoTCore': # 注册到AWS IoT Core
- return self.registerToAWSIoTCore(request_dict, response)
- else:
- return response.json(404)
- @staticmethod
- def getSensorId(request_dict, response):
- """
- 传感器设备获取设备id
- @param request_dict: 请求参数
- @time_stamp: 时间戳
- @time_stamp_token: 时间戳token
- @param response: 响应对象
- @return: sensorId 六位的随机id
- """
- time_stamp = request_dict.get('time_stamp', None)
- time_stamp_token = request_dict.get('time_stamp_token', None)
- if not all([time_stamp, time_stamp_token]):
- return response.json(444, {'param': 'time_stamp, time_stamp_token'})
- try:
- # 时间戳token校验
- if not CommonService.check_time_stamp_token_without_distance(time_stamp_token, time_stamp):
- return response.json(13)
- sensor_id = ''.join(random.sample(string.ascii_letters + string.digits, 6))
- return response.json(0, {'sensor_id': sensor_id})
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def registerToAWSIoTCore(request_dict, response):
- """
- 传感器设备注册到AWS IoT Core
- @param request_dict: 请求参数
- @request_dict sensor_id: 设备id
- @param response: 响应对象
- @return: res iot证书等数据
- """
- sensor_id = request_dict.get('sensor_id', None)
- time_stamp = request_dict.get('time_stamp', None)
- time_stamp_token = request_dict.get('time_stamp_token', None)
- if not all([sensor_id, time_stamp, time_stamp_token]):
- return response.json(444, {'param': 'sensor_id, time_stamp, time_stamp_token'})
- try:
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
|