UnionPayObject.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import hashlib
  2. import requests
  3. import base64
  4. from OpenSSL import crypto
  5. from Ansjer.config import BASE_DIR, SERVER_DOMAIN_SSL
  6. import datetime
  7. from urllib.parse import parse_qs
  8. class UnionPayObject:
  9. def __init__(self):
  10. self.cert_password = b'000000'
  11. self.cert_path = '{}/Ansjer/file/unionpay/acp_test_sign.pfx'.format(BASE_DIR)
  12. self.x509_filepath = '{}/Ansjer/file/unionpay/verify_sign_acp.cer'.format(BASE_DIR)
  13. self.cert = self.get_cert()
  14. self.pay_url = 'https://gateway.test.cup.com.cn/gateway/api/appTransReq.do'
  15. self.query_url = 'https://gateway.test.95516.com/gateway/api/queryTrans.do'
  16. self.refund_url = 'https://101.231.204.80:5000/gateway/api/backTransReq.do'
  17. self.param = {
  18. "version": '5.1.0', # 版本
  19. "encoding": "utf-8", # 编码 可以使用UTF-8,GBK两种方式
  20. "bizType": "000201", # 产品类型 000201:B2C网关支付
  21. "signMethod": "01", # 签名方法 01:RSA签名
  22. "accessType": "0", # 接入类型 0:普通商户直连接入
  23. "merId": '777290058205683', # 商户代码
  24. "txnTime": datetime.datetime.now().strftime("%Y%m%d%H%M%S"), # 订单发送时间
  25. "certId": self.cert['certid'],
  26. }
  27. def pay(self, order_id, price):
  28. request_data = {
  29. "txnType": '01', # 交易类型 01:消费
  30. "txnSubType": "01", # 交易子类 01:自助消费
  31. "backUrl": 'https://test.zositechc.cn/cloudstorage/doapplenotify', # 后台通知地址 需外网
  32. "currencyCode": "156", # 交易币种 156:人民币
  33. # "accType": "01", # 账号类型 01:银行卡02:存折03:IC卡帐号类型(卡介质)
  34. "txnAmt": price, # 订单金额(单位: 分)
  35. "orderId": order_id,
  36. "channelType": "08", # 渠道类型 08:移动
  37. }
  38. self.param.update(request_data)
  39. self.get_sign(self.param)
  40. response = requests.post(self.pay_url, self.param)
  41. data = self.parse_arguments(response.content.decode('utf-8'))
  42. if data['respCode'] != '00':
  43. return False
  44. # self.validate(data)
  45. return data
  46. def get_cert(self):
  47. with open(self.cert_path, "rb") as f:
  48. certs = crypto.load_pkcs12(f.read(), self.cert_password)
  49. x509data = certs.get_certificate()
  50. return {'certid': x509data.get_serial_number(), 'pkey': certs.get_privatekey()}
  51. def get_sign(self, data):
  52. sha256 = hashlib.sha256(self.build_sign_str(data).encode("utf-8")).hexdigest()
  53. private = crypto.sign(self.cert["pkey"], sha256, "sha256")
  54. data["signature"] = str(base64.b64encode(private), encoding="utf-8")
  55. @staticmethod
  56. def build_sign_str(data):
  57. """
  58. 排序
  59. :param data:
  60. :return:
  61. """
  62. req = []
  63. for key in sorted(data.keys()):
  64. if data[key] != '':
  65. req.append("%s=%s" % (key, data[key]))
  66. return '&'.join(req)
  67. @staticmethod
  68. def parse_arguments(raw):
  69. """
  70. @raw: raw data to parse argument
  71. """
  72. data = {}
  73. qs_params = parse_qs(str(raw))
  74. for name in qs_params.keys():
  75. data[name] = qs_params.get(name)[-1]
  76. return data
  77. def validate(self, data):
  78. """
  79. @data: a dict ready for validate, must contain "signature" key name
  80. """
  81. signature = base64.b64decode(data.pop('signature')) # 获取签名
  82. sign_pubkey_cert = data.get("signPubKeyCert", None)
  83. digest = hashlib.sha256(self.build_sign_str(data).encode("utf-8")).hexdigest()
  84. # sign_pubkey_cert = open(self.x509_filepath, 'rb').read()
  85. try:
  86. x509_ert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_pubkey_cert.encode('utf-8'))
  87. crypto.verify(x509_ert, signature, digest, 'sha256')
  88. return True
  89. except Exception as exc:
  90. return False
  91. def query_order(self, order_id):
  92. request_data = {
  93. "txnType": '00', # 交易类型 00:查询交易
  94. "txnSubType": "00", # 交易子类 00:查询交易
  95. "orderId": order_id,
  96. }
  97. self.param.update(request_data)
  98. self.get_sign(self.param)
  99. response = requests.post(self.query_url, self.param)
  100. data = self.parse_arguments(response.content.decode('utf-8'))
  101. if data['respCode'] != '00':
  102. return False
  103. return data
  104. def refund(self, order_id, query_id, price):
  105. request_data = {
  106. "txnType": "04", # 交易类型 00:退货
  107. "txnSubType": "00", # 交易子类 00:查询交易
  108. "orderId": order_id,
  109. "origQryId": query_id,
  110. "backUrl": "https://test.zositechc.cn/cloudstorage/doapplenotify",
  111. "currencyCode": "156",
  112. "channelType": "08",
  113. "txnAmt": price
  114. }
  115. self.param.update(request_data)
  116. self.get_sign(self.param)
  117. response = requests.post(self.refund_url, self.param)
  118. data = self.parse_arguments(response.content.decode('utf-8'))
  119. if data['respCode'] != '00':
  120. return False
  121. return data