12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- # @Author : Rocky
- # @File : RTCController.py
- # @Time : 2025/3/14 8:49
- import random
- import time
- from hashlib import sha1
- import hmac
- import base64
- import binascii
- from optparse import OptionParser
- from django.views.generic.base import View
- from Model.models import Device_Info
- from Object.Enums.RTCEnum import RTCConfigEnum
- from Service.CommonService import CommonService
- class RTCView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- token_code, user_id, response = CommonService.verify_token_get_user_id(request_dict, request)
- if token_code != 0:
- return response.json(token_code)
- if operation == 'getToken': # 获取token
- return self.get_token(request_dict, response)
- @classmethod
- def get_token(cls, request_dict, response):
- """
- 获取token
- https://cloud.baidu.com/doc/RTC/s/Qjxbh7jpu#%E5%9C%A8appserver%E4%B8%8A%E9%83%A8%E7%BD%B2token%E7%94%9F%E6%88%90%E6%9C%8D%E5%8A%A1
- @param request_dict:
- @param response:
- @return: res
- """
- uid = request_dict.get('uid', None)
- if not uid:
- return response.json(444)
- try:
- # 查询主用户user id
- device_info_qs = Device_Info.objects.filter(UID=uid).values('vodPrimaryUserID')
- if not device_info_qs.exists():
- return response.json(173)
- pri_user_id = device_info_qs[0]['vodPrimaryUserID']
- # 签名
- now_time = int(time.time())
- expect_time = 999999999
- random_str = cls.generate_random_str()
- data = "ACS{}{}{}{}{}{}".format(RTCConfigEnum.AppID.value, now_time, random_str, uid, pri_user_id, expect_time)
- signature = hmac.new(RTCConfigEnum.AppKey.value, data, sha1).digest() # .encode('base64').rstrip()
- signature = binascii.b2a_hex(signature)
- token = '{}{}{}{}{}'.format('004', signature, now_time, random_str, expect_time)
- res = {
- 'token': token
- }
- return response.json(0, res)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def generate_random_str():
- # 生成一个随机整数
- random_int = random.randint(0, 2 ** 32 - 1) # 生成一个 0 到 2^32 - 1 的随机整数
- # 将整数转换为 16 进制字符串
- hex_string = hex(random_int)[2:] # 去掉前缀 '0x'
- # 补齐至 8 位
- random_str = hex_string.zfill(8)
- return random_str
|