| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 | #!/usr/bin/env python3# -*- coding: utf-8 -*-  """@Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.@AUTHOR: ASJRD018@NAME: Ansjer@software: PyCharm@DATE: 2018/5/23 16:03@Version: python3.6@MODIFY DECORD:ansjer dev@file: CloudfrontSignCookie.py@Contact: chanjunkai@163.com"""from boto.cloudfront.distribution import Distributionfrom cryptography.hazmat.primitives.asymmetric import paddingfrom cryptography.hazmat.primitives import serializationfrom cryptography.hazmat.backends import default_backendfrom cryptography.hazmat.primitives import hashesimport base64import datetimefrom var_dump import var_dumpimport timeclass BetterThanBoto(Distribution):    def sign_rsa(self, message):        private_key = serialization.load_pem_private_key(self.keyfile, password=None,                                                         backend=default_backend())        signer = private_key.signer(padding.PKCS1v15(), hashes.SHA1())        message = message.encode('utf-8')        signer.update(message)        return signer.finalize()    def _sign_string(self, message, private_key_file=None, private_key_string=None):        if private_key_file:            self.keyfile = open(private_key_file, 'rb').read()        elif private_key_string:            self.keyfile = private_key_string.encode('utf-8')        return self.sign_rsa(message)    @staticmethod    def _url_base64_encode(msg):        """        Base64 encodes a string using the URL-safe characters specified by        Amazon.        """        msg_base64 = base64.b64encode(msg).decode('utf-8')        msg_base64 = msg_base64.replace('+', '-')        msg_base64 = msg_base64.replace('=', '_')        msg_base64 = msg_base64.replace('/', '~')        return msg_base64    def generate_signature(self, policy, private_key_file=None):        """        :param policy: no-whitespace json str (NOT encoded yet)        :param private_key_file: your .pem file with which to sign the policy        :return: encoded signature for use in cookie        """        # Distribution._create_signing_params()        signature = self._sign_string(policy, private_key_file)        # now base64 encode the signature & make URL safe        encoded_signature = self._url_base64_encode(signature)        return encoded_signature    def create_signed_cookies(self, url, private_key_file=None, keypair_id=None,                              expires_at=20, secure=True):        policy = self._custom_policy(            url,            expires_at        )        encoded_policy = self._url_base64_encode(policy.encode('utf-8'))        signature = self.generate_signature(            policy, private_key_file=private_key_file        )        cookies = {            "CloudFront-Policy": encoded_policy,            "CloudFront-Signature": signature,            "CloudFront-Key-Pair-Id": keypair_id        }        return cookiesdef sign_to_cloudfront(object_url, expires_at):    """ Sign URL to distribute file"""    cf = BetterThanBoto()    url = cf.create_signed_url(url=object_url,                               keypair_id="APKAINI6BNPKV54NHH7Q",                               expire_time=expires_at,                               private_key_file="D:/project_svn/Ansjer/test/pk-APKAINI6BNPKV54NHH7Q.pem")    return urldef create_signed_cookies(object_url, expires_at):    """    Create a signed cookie    """    cf = BetterThanBoto()    cookies = cf.create_signed_cookies(url=object_url,keypair_id = "APKAINI6BNPKV54NHH7Q",expires_at = expires_at,private_key_file = "D:/project_svn/Ansjer/test/pk-APKAINI6BNPKV54NHH7Q.pem")    return cookies
 |