CloudfrontSignCookie.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
  5. @AUTHOR: ASJRD018
  6. @NAME: Ansjer
  7. @software: PyCharm
  8. @DATE: 2018/5/23 16:03
  9. @Version: python3.6
  10. @MODIFY DECORD:ansjer dev
  11. @file: CloudfrontSignCookie.py
  12. @Contact: chanjunkai@163.com
  13. """
  14. from boto.cloudfront.distribution import Distribution
  15. from cryptography.hazmat.primitives.asymmetric import padding
  16. from cryptography.hazmat.primitives import serialization
  17. from cryptography.hazmat.backends import default_backend
  18. from cryptography.hazmat.primitives import hashes
  19. import base64
  20. import datetime
  21. from var_dump import var_dump
  22. import time
  23. class BetterThanBoto(Distribution):
  24. def sign_rsa(self, message):
  25. private_key = serialization.load_pem_private_key(self.keyfile, password=None,
  26. backend=default_backend())
  27. signer = private_key.signer(padding.PKCS1v15(), hashes.SHA1())
  28. message = message.encode('utf-8')
  29. signer.update(message)
  30. return signer.finalize()
  31. def _sign_string(self, message, private_key_file=None, private_key_string=None):
  32. if private_key_file:
  33. self.keyfile = open(private_key_file, 'rb').read()
  34. elif private_key_string:
  35. self.keyfile = private_key_string.encode('utf-8')
  36. return self.sign_rsa(message)
  37. @staticmethod
  38. def _url_base64_encode(msg):
  39. """
  40. Base64 encodes a string using the URL-safe characters specified by
  41. Amazon.
  42. """
  43. msg_base64 = base64.b64encode(msg).decode('utf-8')
  44. msg_base64 = msg_base64.replace('+', '-')
  45. msg_base64 = msg_base64.replace('=', '_')
  46. msg_base64 = msg_base64.replace('/', '~')
  47. return msg_base64
  48. def generate_signature(self, policy, private_key_file=None):
  49. """
  50. :param policy: no-whitespace json str (NOT encoded yet)
  51. :param private_key_file: your .pem file with which to sign the policy
  52. :return: encoded signature for use in cookie
  53. """
  54. # Distribution._create_signing_params()
  55. signature = self._sign_string(policy, private_key_file)
  56. # now base64 encode the signature & make URL safe
  57. encoded_signature = self._url_base64_encode(signature)
  58. return encoded_signature
  59. def create_signed_cookies(self, url, private_key_file=None, keypair_id=None,
  60. expires_at=20, secure=True):
  61. policy = self._custom_policy(
  62. url,
  63. expires_at
  64. )
  65. encoded_policy = self._url_base64_encode(policy.encode('utf-8'))
  66. signature = self.generate_signature(
  67. policy, private_key_file=private_key_file
  68. )
  69. cookies = {
  70. "CloudFront-Policy": encoded_policy,
  71. "CloudFront-Signature": signature,
  72. "CloudFront-Key-Pair-Id": keypair_id
  73. }
  74. return cookies
  75. def sign_to_cloudfront(object_url, expires_at):
  76. """ Sign URL to distribute file"""
  77. cf = BetterThanBoto()
  78. url = cf.create_signed_url(url=object_url,
  79. keypair_id="APKAINI6BNPKV54NHH7Q",
  80. expire_time=expires_at,
  81. private_key_file="D:/project_svn/Ansjer/test/pk-APKAINI6BNPKV54NHH7Q.pem")
  82. return url
  83. def create_signed_cookies(object_url, expires_at):
  84. """
  85. Create a signed cookie
  86. """
  87. cf = BetterThanBoto()
  88. cookies = cf.create_signed_cookies(url=object_url,keypair_id = "APKAINI6BNPKV54NHH7Q",expires_at = expires_at,private_key_file = "D:/project_svn/Ansjer/test/pk-APKAINI6BNPKV54NHH7Q.pem")
  89. return cookies