123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- # # -*- encoding: utf-8 -*-
- # """
- # @File : UnicomObject.py
- # @Time : 2022/6/17 11:03
- # @Author : stephen
- # @Email : zhangdongming@asj6.wecom.work
- # @Software: PyCharm
- # """
- # import base64
- # import json
- #
- # import requests
- # from Crypto.Cipher import AES
- # from decimal import Decimal
- #
- # from pysmx.SM3 import hash_msg as sm3_msg
- #
- # from Ansjer.config import unicomAppUrl, unicomAppId, unicomAppSecret, unicomTenantId, \
- # unicomEncodeKey, unicomIvKey, unicomUserName, unicomPassword, unicomPushKey
- # from Object.utils.SymmetricCryptoUtil import AESencrypt
- #
- # """
- # 联通4Gapi
- # 具体参数查看接口文档
- # https://www.showdoc.com.cn/unicomJYHapi/8158648460007467
- # """
- #
- #
- # class UnicomObjeect:
- # def __init__(self):
- # self.appUrl = unicomAppUrl
- # self.appId = unicomAppId
- # self.appSecret = unicomAppSecret
- # self.tenantId = unicomTenantId
- # self.encodeKey = unicomEncodeKey
- # self.ivKey = unicomIvKey
- # self.username = unicomUserName
- # self.password = unicomPassword
- # self.pushKey = unicomPushKey
- # self.headers = {'Tenant': self.tenantId, 'content-type': 'application/x-www-form-urlencoded'}
- #
- # # pip install snowland-smx
- # def createSign(self, reverse=False, **sign_params):
- # """
- # 调用接口(API)时需要对请求参数进行签名(sign)验证,
- # 算法:
- # 根据参数名称将你的所有请求参数按照字母先后顺序排序: key = value & key = value,对除签名外的所有请求参数按
- # key
- # 做的升序排列。
- # 如:将
- # foo = 1, bar = 2, baz = 3
- # 排序为
- # bar = 2, baz = 3, foo = 1
- # 参数名和参数值链接后,得到拼装字符串(注意:参数名与参数值左右两边不能包含空格)
- # bar = 2 & baz = 3 & foo = 1
- # 将
- # pushkey拼接到参数字符尾部进行
- # SM3
- # 加密,再转化成大写,格式是
- # (SM3(key1=value1 & key2=value2 &...& key= pushKey)).upcase
- # @param reverse:
- # @param sign_params:
- # @return:
- # """
- # dict_2 = dict(sorted(sign_params.items(), key=lambda item: item[0], reverse=reverse))
- # data_list = []
- # for item in dict_2.items():
- # if item[0] and item[1]:
- # data_list.append("{}={}".format(item[0], item[1]))
- # val = '&'.join(data_list)
- # push_key = '&key={}'.format(self.pushKey)
- # val = val + push_key
- # return sm3_msg(val).upper()
- #
- # def get_login_authorization(self):
- # """
- # 获取登录授权
- # 注意登录认证Authorization和登录后的请求认证不一样
- # 算法:appId+":"+appSecret base64转码生成 前面加Basic
- # @return: "Basic " + base64_data
- # """
- # voucher = self.appId + ':' + self.appSecret
- # base64_data = str(base64.b64encode(voucher.encode("utf-8")), "utf-8")
- # return "Basic " + base64_data
- #
- # def get_encode_password(self):
- # """
- # 获取对称加密
- # @return: encrypt_pwd
- # """
- # aes = AESencrypt(self.encodeKey.encode('utf-8'), AES.MODE_CBC, self.ivKey.encode('utf-8'),
- # paddingMode="ZeroPadding",
- # characterSet='utf-8')
- # encrypt_pwd = aes.encryptFromString(self.password)
- # return encrypt_pwd
- #
- # def generate_token(self):
- # """
- # 生成令牌
- # @return: token
- # """
- # url = self.appUrl + '/auc/oauth/token'
- # pwd = self.get_encode_password()
- # body = {'username': self.username, 'password': pwd, 'grant_type': 'password', 'scope': 'server'}
- # headers = self.headers
- # headers['Authorization'] = self.get_login_authorization()
- # response_data = requests.post(url, data=body, headers=headers)
- # response_data = json.loads(response_data.text)
- # return response_data['access_token']
- #
- # def refresh_token(self, refresh_token):
- # """
- # 刷新令牌
- # @param refresh_token:
- # @return:
- # """
- # url = self.appUrl + '/auc/oauth/token?grant_type=refresh_token'
- # body = {'refresh_token': refresh_token, 'grant_type': 'refresh_token'}
- # headers = self.headers
- # headers['Authorization'] = self.get_login_authorization()
- # response_data = requests.post(url, data=body, headers=headers)
- # return response_data.text
- #
- # def business_unify_headers(self):
- # """
- # 业务统一headers
- # 在请求头中增加key为Authorization,value为"Bearer " + token
- # @return: headers
- # """
- # token = self.generate_token()
- # headers = self.headers
- # headers['Authorization'] = 'Bearer ' + token
- # return headers
- #
- # # 业务api 注意 get与post 请求数据类型不同 post使用:application/json
- # def verify_device(self, **re_params):
- # """
- # 验证设备
- # @param re_params:
- # @return:
- # """
- # url = self.appUrl + '/platform/api/device/verify-device'
- # return requests.get(url, params=re_params, headers=self.business_unify_headers())
- #
- # def query_device_status(self, **re_params):
- # """
- # 查询设备状态
- # @param re_params:
- # @return:
- # """
- # url = self.appUrl + '/platform/api/device/device-status'
- # return requests.get(url, params=re_params, headers=self.business_unify_headers())
- #
- # def update_device_state(self, **re_data):
- # """
- # 修改设备状态
- # @param re_data:
- # @return:
- # """
- # url = self.appUrl + '/platform/api/device/device-state'
- # headers = self.business_unify_headers()
- # headers['content-type'] = 'application/json'
- # return requests.post(url, data=json.dumps(re_data), headers=headers)
- #
- # def query_device_usage_history(self, **re_params):
- # """
- # 查询设备用量历史
- # @param re_params:
- # @return:
- # """
- # url = self.appUrl + '/platform/api/usage/device-usage-history'
- # return requests.get(url, params=re_params, headers=self.business_unify_headers())
- #
- # def query_current_renew_list_usage_details(self, **re_params):
- # """
- # 查询设备当前队列用量详情
- # @param re_params:
- # @return:
- # """
- # url = self.appUrl + '/platform/api/usage/current-renewlist-usage-details'
- # return requests.get(url, params=re_params, headers=self.business_unify_headers())
- #
- # def get_device_batch_detail(self, **re_data):
- # """
- # 查询设备当前队列用量详情
- # @return:
- # """
- # url = self.appUrl + '/platform/api/device/batch-detail'
- # headers = self.business_unify_headers()
- # headers['content-type'] = 'application/json'
- # return requests.post(url, data=json.dumps(re_data), headers=headers)
- #
- # def query_package_list(self, **re_params):
- # """
- # 查询套餐列表
- # @return:
- # """
- # url = self.appUrl + '/platform/api/package/list'
- # return requests.get(url, params=re_params, headers=self.business_unify_headers())
- #
- # def query_renewal_list(self, **re_params):
- # """
- # 续费套餐列表
- # @param re_params:
- # @return:
- # """
- # url = self.appUrl + '/platform/api/package/list'
- # return requests.get(url, params=re_params, headers=self.business_unify_headers())
- #
- # def async_buy_package(self, **re_data):
- # """
- # 查询设备当前队列用量详情
- # @return:
- # """
- # url = self.appUrl + '/platform/api/package/async-buy-package'
- # headers = self.business_unify_headers()
- # headers['content-type'] = 'application/json'
- # return requests.post(url, data=json.dumps(re_data), headers=headers)
- #
- #
- # if __name__ == '__main__':
- # price = '12.13'
- # print(float(price))
- # discount = '6'
- # dd = Decimal(price) - Decimal(discount)
- # print(dd.quantize(Decimal('0.00')))
- #
- # # data = {'foo': 1, 'bar': 2, 'baz': 3}
- # # print(unicom_api.createSign(**data))
- # unicom_api = UnicomObjeect()
- # # result = unicom_api.generate_token()
- # # result = unicom_api.refresh_token('5d0c0f30-99bd-4f17-9614-3524495b05d4')
- # params = {'iccids': '89860620180077842020'}
- # # response = unicom_api.verify_device(**params)
- # # response = unicom_api.query_device_status(**params)
- # # response = unicom_api.update_device_state(**params)
- # # response = unicom_api.query_device_usage_history(**params)
- # # response = unicom_api.query_current_renew_list_usage_details(**params)
- # # unicom_api.get_device_batch_detail()
- # response = unicom_api.query_package_list()
- # # response = unicom_api.query_renewal_list(**params)
- #
- # if response.status_code == 200:
- # res_dict = json.loads(response.text)
- # print(res_dict)
- # response_json = {
- # "success": True,
- # "msg": "操作成功",
- # "code": 0,
- # "data": {
- # "iccid": "8986062018007784202",
- # "completeIccid": "89860620180077842020",
- # "status": 1
- # }
- # }
- # print(response_json['data']['iccid'])
- # print(response.status_code)
|