123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
- @AUTHOR: ASJRD018
- @NAME: Ansjer
- @software: PyCharm
- @DATE: 2018/5/23 16:03
- @Version: python3.6
- @MODIFY DECORD:ansjer dev
- @file: CloudfrontSignCookie.py
- @Contact: chanjunkai@163.com
- """
- from boto.cloudfront.distribution import Distribution
- from cryptography.hazmat.primitives.asymmetric import padding
- from cryptography.hazmat.primitives import serialization
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives import hashes
- import base64
- import datetime
- from var_dump import var_dump
- import time
- class BetterThanBoto(Distribution):
- def sign_rsa(self, message):
- private_key = serialization.load_pem_private_key(self.keyfile, password=None,
- backend=default_backend())
- signer = private_key.signer(padding.PKCS1v15(), hashes.SHA1())
- message = message.encode('utf-8')
- signer.update(message)
- return signer.finalize()
- def _sign_string(self, message, private_key_file=None, private_key_string=None):
- if private_key_file:
- self.keyfile = open(private_key_file, 'rb').read()
- elif private_key_string:
- self.keyfile = private_key_string.encode('utf-8')
- return self.sign_rsa(message)
- @staticmethod
- def _url_base64_encode(msg):
- """
- Base64 encodes a string using the URL-safe characters specified by
- Amazon.
- """
- msg_base64 = base64.b64encode(msg).decode('utf-8')
- msg_base64 = msg_base64.replace('+', '-')
- msg_base64 = msg_base64.replace('=', '_')
- msg_base64 = msg_base64.replace('/', '~')
- return msg_base64
- def generate_signature(self, policy, private_key_file=None):
- """
- :param policy: no-whitespace json str (NOT encoded yet)
- :param private_key_file: your .pem file with which to sign the policy
- :return: encoded signature for use in cookie
- """
- # Distribution._create_signing_params()
- signature = self._sign_string(policy, private_key_file)
- # now base64 encode the signature & make URL safe
- encoded_signature = self._url_base64_encode(signature)
- return encoded_signature
- def create_signed_cookies(self, url, private_key_file=None, keypair_id=None,
- expires_at=20, secure=True):
- policy = self._custom_policy(
- url,
- expires_at
- )
- encoded_policy = self._url_base64_encode(policy.encode('utf-8'))
- signature = self.generate_signature(
- policy, private_key_file=private_key_file
- )
- cookies = {
- "CloudFront-Policy": encoded_policy,
- "CloudFront-Signature": signature,
- "CloudFront-Key-Pair-Id": keypair_id
- }
- return cookies
- def sign_to_cloudfront(object_url, expires_at):
- """ Sign URL to distribute file"""
- cf = BetterThanBoto()
- url = cf.create_signed_url(url=object_url,
- keypair_id="APKAINI6BNPKV54NHH7Q",
- expire_time=expires_at,
- private_key_file="D:/project_svn/Ansjer/test/pk-APKAINI6BNPKV54NHH7Q.pem")
- return url
- def create_signed_cookies(object_url, expires_at):
- """
- Create a signed cookie
- """
- cf = BetterThanBoto()
- cookies = cf.create_signed_cookies(url=object_url,keypair_id = "APKAINI6BNPKV54NHH7Q",expires_at = expires_at,private_key_file = "D:/project_svn/Ansjer/test/pk-APKAINI6BNPKV54NHH7Q.pem")
- return cookies
|