123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import hashlib
- import requests
- import base64
- from OpenSSL import crypto
- from Ansjer.config import BASE_DIR, SERVER_DOMAIN_SSL
- import datetime
- from urllib.parse import parse_qs
- class UnionPayObject:
- def __init__(self):
- self.cert_password = b'000000'
- self.cert_path = '{}/Ansjer/file/unionpay/acp_test_sign.pfx'.format(BASE_DIR)
- self.x509_filepath = '{}/Ansjer/file/unionpay/verify_sign_acp.cer'.format(BASE_DIR)
- self.cert = self.get_cert()
- self.pay_url = 'https://gateway.test.cup.com.cn/gateway/api/appTransReq.do'
- self.query_url = 'https://gateway.test.95516.com/gateway/api/queryTrans.do'
- self.refund_url = 'https://101.231.204.80:5000/gateway/api/backTransReq.do'
- self.param = {
- "version": '5.1.0', # 版本
- "encoding": "utf-8", # 编码 可以使用UTF-8,GBK两种方式
- "bizType": "000201", # 产品类型 000201:B2C网关支付
- "signMethod": "01", # 签名方法 01:RSA签名
- "accessType": "0", # 接入类型 0:普通商户直连接入
- "merId": '777290058205683', # 商户代码
- "txnTime": datetime.datetime.now().strftime("%Y%m%d%H%M%S"), # 订单发送时间
- "certId": self.cert['certid'],
- }
- def pay(self, order_id, price):
- request_data = {
- "txnType": '01', # 交易类型 01:消费
- "txnSubType": "01", # 交易子类 01:自助消费
- "backUrl": 'https://test.zositechc.cn/cloudstorage/doapplenotify', # 后台通知地址 需外网
- "currencyCode": "156", # 交易币种 156:人民币
- # "accType": "01", # 账号类型 01:银行卡02:存折03:IC卡帐号类型(卡介质)
- "txnAmt": price, # 订单金额(单位: 分)
- "orderId": order_id,
- "channelType": "08", # 渠道类型 08:移动
- }
- self.param.update(request_data)
- self.get_sign(self.param)
- response = requests.post(self.pay_url, self.param)
- data = self.parse_arguments(response.content.decode('utf-8'))
- if data['respCode'] != '00':
- return False
- # self.validate(data)
- return data
- def get_cert(self):
- with open(self.cert_path, "rb") as f:
- certs = crypto.load_pkcs12(f.read(), self.cert_password)
- x509data = certs.get_certificate()
- return {'certid': x509data.get_serial_number(), 'pkey': certs.get_privatekey()}
- def get_sign(self, data):
- sha256 = hashlib.sha256(self.build_sign_str(data).encode("utf-8")).hexdigest()
- private = crypto.sign(self.cert["pkey"], sha256, "sha256")
- data["signature"] = str(base64.b64encode(private), encoding="utf-8")
- @staticmethod
- def build_sign_str(data):
- """
- 排序
- :param data:
- :return:
- """
- req = []
- for key in sorted(data.keys()):
- if data[key] != '':
- req.append("%s=%s" % (key, data[key]))
- return '&'.join(req)
- @staticmethod
- def parse_arguments(raw):
- """
- @raw: raw data to parse argument
- """
- data = {}
- qs_params = parse_qs(str(raw))
- for name in qs_params.keys():
- data[name] = qs_params.get(name)[-1]
- return data
- def validate(self, data):
- """
- @data: a dict ready for validate, must contain "signature" key name
- """
- signature = base64.b64decode(data.pop('signature')) # 获取签名
- sign_pubkey_cert = data.get("signPubKeyCert", None)
- digest = hashlib.sha256(self.build_sign_str(data).encode("utf-8")).hexdigest()
- # sign_pubkey_cert = open(self.x509_filepath, 'rb').read()
- try:
- x509_ert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_pubkey_cert.encode('utf-8'))
- crypto.verify(x509_ert, signature, digest, 'sha256')
- return True
- except Exception as exc:
- return False
- def query_order(self, order_id):
- request_data = {
- "txnType": '00', # 交易类型 00:查询交易
- "txnSubType": "00", # 交易子类 00:查询交易
- "orderId": order_id,
- }
- self.param.update(request_data)
- self.get_sign(self.param)
- response = requests.post(self.query_url, self.param)
- data = self.parse_arguments(response.content.decode('utf-8'))
- if data['respCode'] != '00':
- return False
- return data
- def refund(self, order_id, query_id, price):
- request_data = {
- "txnType": "04", # 交易类型 00:退货
- "txnSubType": "00", # 交易子类 00:查询交易
- "orderId": order_id,
- "origQryId": query_id,
- "backUrl": "https://test.zositechc.cn/cloudstorage/doapplenotify",
- "currencyCode": "156",
- "channelType": "08",
- "txnAmt": price
- }
- self.param.update(request_data)
- self.get_sign(self.param)
- response = requests.post(self.refund_url, self.param)
- data = self.parse_arguments(response.content.decode('utf-8'))
- if data['respCode'] != '00':
- return False
- return data
|