import hashlib import requests import base64 from OpenSSL import crypto from Ansjer.config import BASE_DIR, SERVER_DOMAIN_SSL import datetime from urllib.parse import parse_qs class UnionPayObject: def __init__(self): self.cert_password = b'000000' self.cert_path = '{}/Ansjer/file/unionpay/acp_test_sign.pfx'.format(BASE_DIR) self.x509_filepath = '{}/Ansjer/file/unionpay/verify_sign_acp.cer'.format(BASE_DIR) self.cert = self.get_cert() self.pay_url = 'https://gateway.test.cup.com.cn/gateway/api/appTransReq.do' self.query_url = 'https://gateway.test.95516.com/gateway/api/queryTrans.do' self.refund_url = 'https://101.231.204.80:5000/gateway/api/backTransReq.do' self.param = { "version": '5.1.0', # 版本 "encoding": "utf-8", # 编码 可以使用UTF-8,GBK两种方式 "bizType": "000201", # 产品类型 000201:B2C网关支付 "signMethod": "01", # 签名方法 01:RSA签名 "accessType": "0", # 接入类型 0:普通商户直连接入 "merId": '777290058205683', # 商户代码 "txnTime": datetime.datetime.now().strftime("%Y%m%d%H%M%S"), # 订单发送时间 "certId": self.cert['certid'], } def pay(self, order_id, price): request_data = { "txnType": '01', # 交易类型 01:消费 "txnSubType": "01", # 交易子类 01:自助消费 "backUrl": 'https://test.zositechc.cn/cloudstorage/doapplenotify', # 后台通知地址 需外网 "currencyCode": "156", # 交易币种 156:人民币 # "accType": "01", # 账号类型 01:银行卡02:存折03:IC卡帐号类型(卡介质) "txnAmt": price, # 订单金额(单位: 分) "orderId": order_id, "channelType": "08", # 渠道类型 08:移动 } self.param.update(request_data) self.get_sign(self.param) response = requests.post(self.pay_url, self.param) data = self.parse_arguments(response.content.decode('utf-8')) if data['respCode'] != '00': return False # self.validate(data) return data def get_cert(self): with open(self.cert_path, "rb") as f: certs = crypto.load_pkcs12(f.read(), self.cert_password) x509data = certs.get_certificate() return {'certid': x509data.get_serial_number(), 'pkey': certs.get_privatekey()} def get_sign(self, data): sha256 = hashlib.sha256(self.build_sign_str(data).encode("utf-8")).hexdigest() private = crypto.sign(self.cert["pkey"], sha256, "sha256") data["signature"] = str(base64.b64encode(private), encoding="utf-8") @staticmethod def build_sign_str(data): """ 排序 :param data: :return: """ req = [] for key in sorted(data.keys()): if data[key] != '': req.append("%s=%s" % (key, data[key])) return '&'.join(req) @staticmethod def parse_arguments(raw): """ @raw: raw data to parse argument """ data = {} qs_params = parse_qs(str(raw)) for name in qs_params.keys(): data[name] = qs_params.get(name)[-1] return data def validate(self, data): """ @data: a dict ready for validate, must contain "signature" key name """ signature = base64.b64decode(data.pop('signature')) # 获取签名 sign_pubkey_cert = data.get("signPubKeyCert", None) digest = hashlib.sha256(self.build_sign_str(data).encode("utf-8")).hexdigest() # sign_pubkey_cert = open(self.x509_filepath, 'rb').read() try: x509_ert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_pubkey_cert.encode('utf-8')) crypto.verify(x509_ert, signature, digest, 'sha256') return True except Exception as exc: return False def query_order(self, order_id): request_data = { "txnType": '00', # 交易类型 00:查询交易 "txnSubType": "00", # 交易子类 00:查询交易 "orderId": order_id, } self.param.update(request_data) self.get_sign(self.param) response = requests.post(self.query_url, self.param) data = self.parse_arguments(response.content.decode('utf-8')) if data['respCode'] != '00': return False return data def refund(self, order_id, query_id, price): request_data = { "txnType": "04", # 交易类型 00:退货 "txnSubType": "00", # 交易子类 00:查询交易 "orderId": order_id, "origQryId": query_id, "backUrl": "https://test.zositechc.cn/cloudstorage/doapplenotify", "currencyCode": "156", "channelType": "08", "txnAmt": price } self.param.update(request_data) self.get_sign(self.param) response = requests.post(self.refund_url, self.param) data = self.parse_arguments(response.content.decode('utf-8')) if data['respCode'] != '00': return False return data