ソースを参照

获取阿里云 oss sts令牌接口

locky 2 週間 前
コミット
c4a4e137b3
1 ファイル変更94 行追加0 行削除
  1. 94 0
      Controller/CloudStorage.py

+ 94 - 0
Controller/CloudStorage.py

@@ -86,6 +86,9 @@ class CloudStorageView(View):
         elif operation == 'getsignsts':  # 设备调用,获取sts令牌
             ip = CommonService.get_ip_address(request)
             return self.do_get_sign_sts(request_dict, ip, response)
+        elif operation == 'getsignstsoss':  # 设备调用,获取sts令牌
+            ip = CommonService.get_ip_address(request)
+            return self.do_get_sign_sts_oss(request_dict, ip, response)
         elif operation == 'storeplaylist':  # 设备调用,设备把视频上传到s3,同时把视频数据信息传给服务器,服务器存储播放内容
             return self.do_store_playlist(request_dict, response)
         elif operation == 'signplaym3u8':  # 根据sts播放m3u8 视频流
@@ -603,6 +606,97 @@ class CloudStorageView(View):
         except Exception as e:
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
+    @staticmethod
+    def do_get_sign_sts_oss(request_dict, ip, response):
+        """
+        获取 阿里云 oss sts令牌
+        @param request_dict: 请求数据
+        @param ip: ip地址
+        @request_dict uidToken: uid_token
+        @param response: 响应
+        @return: response
+        """
+        uid = request_dict.get('uid', None)
+        channel = request_dict.get('channel', None)
+
+        if not all([uid, channel]):
+            return response.json(444, 'uidToken')
+
+        try:
+            # 阿里云 oss sts
+            from aliyunsdkcore import client
+            from aliyunsdkcore.request import CommonRequest
+            import json
+            from Ansjer.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, OSS_ROLE_ARN
+            
+            storage = '{uid}/vod{channel}/'.format(uid=uid, channel=channel)
+            bucket_name = 'asj-test-1'  # 存储桶名称
+            endpoint = 'oss-cn-shenzhen.aliyuncs.com'  # OSS endpoint
+            region_id = 'cn-shenzhen'  # 地域ID
+            
+            # 创建 AcsClient 实例
+            clt = client.AcsClient(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, region_id)
+            
+            # 创建 CommonRequest 请求
+            request = CommonRequest(product="Sts", version='2015-04-01', action_name='AssumeRole')
+            request.set_method('POST')
+            request.set_protocol_type('https')
+            request.add_query_param('RoleArn', OSS_ROLE_ARN)
+            request.add_query_param('RoleSessionName', '{role_name}'.format(role_name=uid + '_' + str(channel)))
+            request.add_query_param('DurationSeconds', '3600')  # 修改为1小时(3600秒),符合阿里云STS的限制
+            request.set_accept_format('JSON')
+            
+            # 设置权限策略
+            resource_access = "acs:oss:*:*:{}/*".format(bucket_name)
+            
+            policy = {
+                "Version": "1",
+                "Statement": [
+                    {
+                        "Effect": "Allow",
+                        "Action": ["oss:*"],   # 可以根据需要限制操作权限
+                        "Resource": [resource_access]
+                    }
+                ]
+            }
+            
+            request.add_query_param('Policy', json.dumps(policy))
+            
+            # 发送请求,获取响应
+            try:
+                body = clt.do_action_with_exception(request)
+                token = json.loads(body.decode('utf-8'))
+                
+                # 构建返回结果
+                res = {
+                    'AccessKeyId': token['Credentials']['AccessKeyId'],
+                    'AccessKeySecret': token['Credentials']['AccessKeySecret'],
+                    'SecurityToken': token['Credentials']['SecurityToken'],
+                    'Expiration': token['Credentials']['Expiration'],
+                    'expire': 3600,
+                    'endpoint': endpoint,
+                    'bucket_name': bucket_name,
+                    'arn': token['AssumedRoleUser']['Arn'],
+                    'code': 0,
+                    'storage': storage,
+                    'endTime': 9999999999,
+                    'ip': ip,
+                    'region': region_id,
+                    'bucket_mold': 0
+                }
+            except Exception as e:
+                # 捕获所有异常并返回自定义错误信息
+                return response.json(
+                    500, 
+                    'Internal error!',
+                    'STS服务异常: {}'.format(str(e)),
+                    500
+                )
+
+            return JsonResponse(status=200, data=res)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
     @staticmethod
     def do_query_vod_list(request_dict, user_id, response):  # 获取视频播放列表
         """