AmazonS3Util.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : AmazonS3Util.py
  4. @Time : 2022/8/11 14:00
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import logging
  10. import traceback
  11. import boto3
  12. import botocore
  13. from boto3.session import Session
  14. from botocore import client
  15. from botocore.exceptions import ClientError
  16. logger = logging.getLogger('info')
  17. class AmazonS3Util:
  18. def __init__(self, aws_access_key_id, secret_access_key, region_name):
  19. self.access_id = aws_access_key_id
  20. self.access_secret = secret_access_key
  21. self.region_name = region_name
  22. session = Session(
  23. aws_access_key_id=aws_access_key_id,
  24. aws_secret_access_key=secret_access_key,
  25. region_name=region_name
  26. )
  27. self.client_conn = boto3.client(
  28. 's3',
  29. aws_access_key_id=aws_access_key_id,
  30. aws_secret_access_key=secret_access_key,
  31. config=botocore.client.Config(signature_version='s3v4'),
  32. region_name=region_name
  33. )
  34. self.session_conn = session.resource('s3')
  35. def upload_file_obj(self, bucket, file_key, file_obj, extra_args=None):
  36. """
  37. 对象上传至S3存储桶
  38. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.upload_file
  39. @param bucket: 存储桶名称-必须
  40. @param file_key: 需要上传文件路径+文件名称
  41. @param file_obj: 文件对象
  42. @param extra_args: 额外参数 如ACL配置
  43. @return: 当上传成功时为True;否则,False
  44. """
  45. try:
  46. session = self.session_conn
  47. bucket = session.Bucket(bucket)
  48. obj = bucket.Object(file_key)
  49. obj.upload_fileobj(file_obj, ExtraArgs=extra_args)
  50. return True
  51. except Exception as e:
  52. print(e.args)
  53. ex = traceback.format_exc()
  54. print('具体错误{}'.format(ex))
  55. return False
  56. def generate_file_obj_url(self, bucket, file_key):
  57. """
  58. 生成对象URL
  59. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.generate_presigned_url
  60. @param bucket: 存储桶名称
  61. @param file_key: 文件名称
  62. @return: url
  63. """
  64. response_url = self.client_conn.generate_presigned_url(
  65. ClientMethod='get_object',
  66. Params={
  67. 'Bucket': bucket,
  68. 'Key': file_key
  69. }
  70. )
  71. return response_url
  72. def delete_obj(self, bucket, file_key):
  73. """
  74. 删除对象
  75. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.delete
  76. @param bucket: 存储桶
  77. @param file_key: 文件名称
  78. @return: 当删除成功时为True;否则,False
  79. """
  80. try:
  81. bucket = self.session_conn.Bucket(bucket)
  82. obj = bucket.Object(file_key)
  83. obj.delete()
  84. return True
  85. except Exception as e:
  86. print(e.args)
  87. ex = traceback.format_exc()
  88. print('具体错误{}'.format(ex))
  89. return False
  90. def bucket_exists(self, bucket_name):
  91. """
  92. 判断桶是否存在,是否有访问权限
  93. @return: 当bucket存在时为True;否则,假的
  94. """
  95. try:
  96. self.client_conn.head_bucket(Bucket=bucket_name)
  97. logger.info("存储桶 {} 存在.".format(bucket_name))
  98. exists = True
  99. except ClientError:
  100. logger.warning(
  101. "存储桶 {} 不存在,或者你没有权限.".format(bucket_name))
  102. exists = False
  103. return exists
  104. def get_object(self, bucket, key):
  105. """
  106. 获取对象
  107. @param bucket: 存储桶
  108. @param key: 文件
  109. @return : boolean
  110. """
  111. try:
  112. self.client_conn.get_object(Bucket=bucket, Key=key)
  113. return True
  114. except self.client_conn.exceptions.NoSuchKey:
  115. return False
  116. def download_object(self, bucket, key, file_name):
  117. """
  118. 下载对象至本地
  119. @param file_name: 保存位置以及名称
  120. @param bucket: 存储桶
  121. @param key: 文件
  122. @return : boolean
  123. """
  124. try:
  125. self.client_conn.download_file(bucket, key, file_name)
  126. return True
  127. except Exception as e:
  128. return e.args
  129. def copy_obj(self, source_bucket, to_bucket, file_key):
  130. """
  131. 复制对象
  132. @param source_bucket: 原存储桶
  133. @param file_key: 文件名称
  134. @param to_bucket: 新存储桶
  135. """
  136. source_dict = {
  137. 'Bucket': source_bucket,
  138. 'Key': file_key
  139. }
  140. self.session_conn.meta.client.copy(source_dict, to_bucket, file_key)
  141. def copy_single_obj(self, source_bucket, source_object, target_bucket, target_object, StorageClass=None):
  142. """
  143. 单个对象复制
  144. @param source_bucket:源存储桶
  145. @param source_object:源对象
  146. @param target_bucket:目标存储桶
  147. @param target_object:目标对象
  148. @param StorageClass:存储类
  149. @return: None
  150. """
  151. s3 = self.session_conn
  152. copy_source = {
  153. 'Bucket': source_bucket,
  154. 'Key': source_object
  155. }
  156. target_object = s3.Object(target_bucket, target_object)
  157. if StorageClass:
  158. target_object.copy_from(CopySource=copy_source, StorageClass=StorageClass)
  159. else:
  160. target_object.copy_from(CopySource=copy_source)
  161. def generate_put_obj_url(self, bucket_name, obj_key, storage_class=None):
  162. """
  163. 生成预签名对象URL
  164. @param bucket_name: 存储桶名称
  165. @param obj_key: 对象key
  166. @param storage_class: 存储类 例
  167. @return: 对象URL
  168. """
  169. params = {
  170. 'Bucket': bucket_name,
  171. 'Key': obj_key,
  172. }
  173. if storage_class:
  174. params['StorageClass'] = storage_class
  175. return self.session_conn.meta.client.generate_presigned_url('put_object',
  176. Params=params,
  177. ExpiresIn=7200)
  178. def batch_copy_obj(self, source_bucket, target_bucket, prefix, target_prefix, storage_class=None):
  179. """
  180. 批量拷贝对象
  181. @param source_bucket: 源存储桶
  182. @param target_bucket: 目标存储桶
  183. @param prefix: 需要搜索的对象前缀 例:AUS000247LTCLY/vod1/1686043996
  184. @param target_prefix: 目标对象前缀 例:app/algorithm-shop/1686043996
  185. @param storage_class: 存储类
  186. @return: None
  187. """
  188. s3 = self.session_conn
  189. # 遍历源存储桶中指定前缀下的所有对象,依次进行复制操作
  190. for obj in s3.Bucket(source_bucket).objects.filter(Prefix=prefix):
  191. key = obj.key # 对象键名
  192. target_key = f'{target_prefix}/' + key.split('/')[-1] # 新的对象键名,此处为 "new_path/" + 原有文件名
  193. copy_source = {
  194. 'Bucket': source_bucket,
  195. 'Key': key
  196. }
  197. # 将对象复制到目标存储桶,并设置存储类型和新的对象键名
  198. if storage_class:
  199. s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source, StorageClass=storage_class)
  200. else:
  201. s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source)
  202. def get_object_size(self, bucket_name, object_key):
  203. """
  204. 获取存储桶中指定对象的大小
  205. :param bucket_name: string,存储桶名称
  206. :param object_key: string,对象键名
  207. :return: int,指定对象的大小,单位为字节
  208. """
  209. s3 = self.session_conn
  210. obj = s3.Object(bucket_name, object_key)
  211. try:
  212. return obj.content_length
  213. except Exception as e:
  214. return 0
  215. def get_object_list(self, bucket_name, prefix, start_after='', end_time=None):
  216. """
  217. 获取指定路径所有对象
  218. :param bucket_name: string,存储桶名称
  219. :param prefix: string,路径
  220. :param start_after: string,开始键
  221. :param end_time: string,结束时间
  222. :return: int,指定对象的大小,单位为字节
  223. """
  224. try:
  225. s3 = self.client_conn
  226. continuation_token = ''
  227. contents = []
  228. while True:
  229. if continuation_token:
  230. result = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after,
  231. ContinuationToken=continuation_token)
  232. else:
  233. result = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after)
  234. contents += result['Contents']
  235. continuation_token = result['NextContinuationToken']
  236. if result['KeyCount'] < 1000:
  237. break
  238. if end_time and end_time < int(result['Contents'][-1]['Key'].split('/')[2]):
  239. break
  240. return contents
  241. except Exception as e:
  242. return []