| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 | # @Author    : Rocky# @File      : RTCController.py# @Time      : 2025/3/14 8:49import randomimport timefrom hashlib import sha1import hmacimport base64import binasciifrom optparse import OptionParserfrom django.views.generic.base import Viewfrom Model.models import Device_Infofrom Object.Enums.RTCEnum import RTCConfigEnumfrom Service.CommonService import CommonServiceclass RTCView(View):    def get(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        return self.validation(request.GET, request, operation)    def post(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        return self.validation(request.POST, request, operation)    def validation(self, request_dict, request, operation):        token_code, user_id, response = CommonService.verify_token_get_user_id(request_dict, request)        if token_code != 0:            return response.json(token_code)        if operation == 'getToken':  # 获取token            return self.get_token(request_dict, response)    @classmethod    def get_token(cls, request_dict, response):        """        获取token        https://cloud.baidu.com/doc/RTC/s/Qjxbh7jpu#%E5%9C%A8appserver%E4%B8%8A%E9%83%A8%E7%BD%B2token%E7%94%9F%E6%88%90%E6%9C%8D%E5%8A%A1        @param request_dict:        @param response:        @return: res        """        uid = request_dict.get('uid', None)        if not uid:            return response.json(444)        try:            # 查询主用户user id            device_info_qs = Device_Info.objects.filter(UID=uid).values('vodPrimaryUserID')            if not device_info_qs.exists():                return response.json(173)            pri_user_id = device_info_qs[0]['vodPrimaryUserID']            # 签名            now_time = int(time.time())            expect_time = 999999999            random_str = cls.generate_random_str()            data = "ACS{}{}{}{}{}{}".format(RTCConfigEnum.AppID.value, now_time, random_str, uid, pri_user_id, expect_time)            signature = hmac.new(RTCConfigEnum.AppKey.value, data, sha1).digest()  # .encode('base64').rstrip()            signature = binascii.b2a_hex(signature)            token = '{}{}{}{}{}'.format('004', signature, now_time, random_str, expect_time)            res = {                'token': token            }            return response.json(0, res)        except Exception as e:            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def generate_random_str():        # 生成一个随机整数        random_int = random.randint(0, 2 ** 32 - 1)  # 生成一个 0 到 2^32 - 1 的随机整数        # 将整数转换为 16 进制字符串        hex_string = hex(random_int)[2:]  # 去掉前缀 '0x'        # 补齐至 8 位        random_str = hex_string.zfill(8)        return random_str
 |