# coding=utf-8 # + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + + # ┏┓ ┏┓+ + # ┏┛┻━━━┛┻┓ + + # ┃ ┃ # ┃ ━ ┃ ++ + + + # ████━████ ┃+ # ┃ ┃ + # ┃ ┻ ┃ # ┃ ┃ + + # ┗━┓ ┏━┛ # ┃ ┃ # ┃ ┃ + + + + # ┃ ┃ Codes are far away from bugs with the animal protecting # ┃ ┃ + 神兽保佑,代码无bug # ┃ ┃ # ┃ ┃ + # ┃ ┗━━━┓ + + # ┃ ┣┓ # ┃ ┏┛ # ┗┓┓┏━┳┓┏┛ + + + + # ┃┫┫ ┃┫┫ # ┗┻┛ ┗┻┛+ + + + # + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + +""" """ """ import base64 import hashlib import time import datetime def getTimeDict(times): time_dict = [] t = 0 for x in range(24): if x < 10: x = '0' + str(x) else: x = str(x) # a = times.strftime("%Y-%m-%d") + " " + x a = times.strftime("%Y%m%d") + x time_dict.append(a) t += 1 return time_dict def md5s(text, isBackByte=False): """md5加密函数""" md5 = hashlib.md5() if isinstance(text, bytes): md5.update(text) else: md5.update(text.encode('utf-8')) if isBackByte: # 返回二进制的加密结果 return md5.digest() # 返回十六进制的机密结果 return md5.hexdigest() def base64_encode(text, isBytes=False): """进行base64编码处理""" if isBytes: return base64.b64encode(text) return base64.b64encode(bytes(text, encoding="utf-8")) def get_timestamp10(): """获取当前时间长度为10位长度的时间戳""" return int(time.time()) def get_sign_url_content(to_tt): # url = '13.56.141.156:81' # url = '35.176.238.187:81' url = '111.230.145.16:81' secret = 'ansjer'; # 密钥--对应#st的哈希格式为 secret+url+e,e为时间戳单位s,url为请求地址 secure_link_md5 xiaozhong.com$uri$arg_e; # path = '/hls/5553.mp4/index.m3u8' # 下载文件 # path = '/L59KVYDAEPHR1T6M111A_0/555666.mp4' # 下载文件 path = '/Relay' + to_tt + '.txt' # 下载文件 # path = '/444.mp4' # 下载文件 # 下载到期时间,time是当前时间,300表示300秒,也就是说从现在到300秒之内文件不过期 expire = get_timestamp10() + 3600; res = md5s(str(secret) + str(path) + str(expire), True) md5_str = str(base64_encode(res, True)) md5_str = md5_str.replace('b\'', '').replace('\'', '').replace('+', '-').replace('/', '_').replace('=', '') sig_uri = 'http://' + url + path + '?' + 'st=' + str(md5_str) + '&e=' + str(expire) # print(sig_uri) return { 'url' : sig_uri, 'time' :to_tt } to_date = datetime.datetime.now() - datetime.timedelta(days=1) to_list = getTimeDict(to_date) HTML_DATA = '' # sign_uri_list = [] for tt in to_list: su = get_sign_url_content(to_tt=tt) HTML_DATA +='
'.replace("{{time}}",su['time']).replace("{{url}}",su['url']) # sign_uri_list.append(su) # print(sign_uri_list) # print(HTML_DATA) print(type(HTML_DATA)) from boto3.session import Session import traceback AWS_ACCESS_ID = 'AKIAJKPU23EU5QWHFPKQ' AWS_ACCESS_SECRET = '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' AWS_ACCESS_REGION = 'us-east-1' COMPANY_EMAIL = 'user_server@nsst.com' session = Session( aws_access_key_id=AWS_ACCESS_ID, aws_secret_access_key=AWS_ACCESS_SECRET, region_name=AWS_ACCESS_REGION, ) conn = session.client('ses') try: response = conn.send_email( # 发送人 Source=COMPANY_EMAIL, Destination={ # 收件人 'ToAddresses': [COMPANY_EMAIL] # 'ToAddresses': ['1758730877@qq.com'] }, Message={ # 标题 'Subject': { # 'Data': '英国P2P日志文件', 'Data': '中国P2P日志文件', # 'Data': '美国P2P日志文件', 'Charset': 'utf-8' }, 'Body': { 'Html': { 'Charset': 'UTF-8', 'Data': HTML_DATA, }, } }, ) except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) else: print('yes')