| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 | 
							- from aliyunsdkcore import client
 
- from aliyunsdksts.request.v20150401 import AssumeRoleRequest
 
- import json
 
- import oss2
 
- # Endpoint以杭州为例,其它egion请按实际情况填写。
 
- endpoint = 'oss-cn-shenzhen.aliyuncs.com'
 
- access_key_id = 'LTAIyMkGfEdogyL9'
 
- access_key_secret = '71uIjpsqVOmF7DAITRyRuc259jHOjO'
 
- bucket_name = 'cnvod1'
 
- # role_arn是角色的资源名称。
 
- role_arn = 'acs:ram::1901342792446414:role/stsoss'
 
- clt = client.AcsClient(access_key_id, access_key_secret, 'cn-shenzhen')
 
- req = AssumeRoleRequest.AssumeRoleRequest()
 
- # 设置返回值格式为JSON。
 
- req.set_accept_format('json')
 
- req.set_RoleArn(role_arn)
 
- req.set_RoleSessionName('uid13241234123')
 
- req.set_DurationSeconds(3600)
 
- policys = {
 
-     "Version": "1",
 
-     "Statement": [
 
-         {
 
-             "Action": [
 
-                 "oss:PutObject",
 
-                 "oss:DeleteObject",
 
-             ],
 
-             # "Resource": ["acs:oss:*:*:cloudvod1/*"],
 
-             "Resource": ["acs:oss:*:*:cloudvod1/test/*"],
 
-             "Effect": "Allow",
 
-             "Condition": {
 
-                 "IpAddress": {
 
-                     "acs:SourceIp": "120.237.157.184"
 
-                 }
 
-             }
 
-         }
 
-     ]
 
- }
 
- req.set_Policy(Policy=json.dumps(policys))
 
- body = clt.do_action(req)
 
- # body = clt.do_action_with_exception(req)
 
- # 使用RAM账号的AccessKeyId和AccessKeySecret向STS申请临时token。
 
- token = json.loads(body)
 
- print(token)
 
- exit()
 
- # 使用临时token中的认证信息初始化StsAuth实例。
 
- auth = oss2.StsAuth(token['Credentials']['AccessKeyId'],
 
-                     token['Credentials']['AccessKeySecret'],
 
-                     token['Credentials']['SecurityToken'])
 
- print(auth)
 
- # 使用StsAuth实例初始化存储空间。
 
- bucket = oss2.Bucket(auth, endpoint, bucket_name)
 
- # 上传一个字符串。
 
- # res = bucket.put_object('oss_media_hls.ts', b'hello world')
 
- # res = bucket.put_object('test/test-name.txt', b'hello world')
 
- # print(res)
 
- # oss append obj
 
- result = bucket.append_object('mio', 0, 'content of first append')
 
- print(result)
 
- # 如果不是首次上传,可以通过bucket.head_object方法或上次追加返回值的next_position属性,得到追加位置。
 
- # bucket.append_object('<yourObjectName>', result.next_position, 'content of second append')
 
 
  |