#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: Ansjer @software: PyCharm @DATE: 2018/5/23 16:03 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: CloudfrontSignCookie.py @Contact: chanjunkai@163.com """ from boto.cloudfront.distribution import Distribution from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes import base64 import datetime from var_dump import var_dump import time class BetterThanBoto(Distribution): def sign_rsa(self, message): private_key = serialization.load_pem_private_key(self.keyfile, password=None, backend=default_backend()) signer = private_key.signer(padding.PKCS1v15(), hashes.SHA1()) message = message.encode('utf-8') signer.update(message) return signer.finalize() def _sign_string(self, message, private_key_file=None, private_key_string=None): if private_key_file: self.keyfile = open(private_key_file, 'rb').read() elif private_key_string: self.keyfile = private_key_string.encode('utf-8') return self.sign_rsa(message) @staticmethod def _url_base64_encode(msg): """ Base64 encodes a string using the URL-safe characters specified by Amazon. """ msg_base64 = base64.b64encode(msg).decode('utf-8') msg_base64 = msg_base64.replace('+', '-') msg_base64 = msg_base64.replace('=', '_') msg_base64 = msg_base64.replace('/', '~') return msg_base64 def generate_signature(self, policy, private_key_file=None): """ :param policy: no-whitespace json str (NOT encoded yet) :param private_key_file: your .pem file with which to sign the policy :return: encoded signature for use in cookie """ # Distribution._create_signing_params() signature = self._sign_string(policy, private_key_file) # now base64 encode the signature & make URL safe encoded_signature = self._url_base64_encode(signature) return encoded_signature def create_signed_cookies(self, url, private_key_file=None, keypair_id=None, expires_at=20, secure=True): policy = self._custom_policy( url, expires_at ) encoded_policy = self._url_base64_encode(policy.encode('utf-8')) signature = self.generate_signature( policy, private_key_file=private_key_file ) cookies = { "CloudFront-Policy": encoded_policy, "CloudFront-Signature": signature, "CloudFront-Key-Pair-Id": keypair_id } return cookies def sign_to_cloudfront(object_url, expires_at): """ Sign URL to distribute file""" cf = BetterThanBoto() url = cf.create_signed_url(url=object_url, keypair_id="APKAINI6BNPKV54NHH7Q", expire_time=expires_at, private_key_file="D:/project_svn/Ansjer/test/pk-APKAINI6BNPKV54NHH7Q.pem") return url def create_signed_cookies(object_url, expires_at): """ Create a signed cookie """ cf = BetterThanBoto() cookies = cf.create_signed_cookies(url=object_url,keypair_id = "APKAINI6BNPKV54NHH7Q",expires_at = expires_at,private_key_file = "D:/project_svn/Ansjer/test/pk-APKAINI6BNPKV54NHH7Q.pem") return cookies