# # -*- encoding: utf-8 -*- # """ # @File : UnicomObject.py # @Time : 2022/6/17 11:03 # @Author : stephen # @Email : zhangdongming@asj6.wecom.work # @Software: PyCharm # """ # import base64 # import json # # import requests # from Crypto.Cipher import AES # from decimal import Decimal # # from pysmx.SM3 import hash_msg as sm3_msg # # from Ansjer.config import unicomAppUrl, unicomAppId, unicomAppSecret, unicomTenantId, \ # unicomEncodeKey, unicomIvKey, unicomUserName, unicomPassword, unicomPushKey # from Object.utils.SymmetricCryptoUtil import AESencrypt # # """ # 联通4Gapi # 具体参数查看接口文档 # https://www.showdoc.com.cn/unicomJYHapi/8158648460007467 # """ # # # class UnicomObjeect: # def __init__(self): # self.appUrl = unicomAppUrl # self.appId = unicomAppId # self.appSecret = unicomAppSecret # self.tenantId = unicomTenantId # self.encodeKey = unicomEncodeKey # self.ivKey = unicomIvKey # self.username = unicomUserName # self.password = unicomPassword # self.pushKey = unicomPushKey # self.headers = {'Tenant': self.tenantId, 'content-type': 'application/x-www-form-urlencoded'} # # # pip install snowland-smx # def createSign(self, reverse=False, **sign_params): # """ # 调用接口(API)时需要对请求参数进行签名(sign)验证, # 算法: # 根据参数名称将你的所有请求参数按照字母先后顺序排序: key = value & key = value,对除签名外的所有请求参数按 # key # 做的升序排列。 # 如:将 # foo = 1, bar = 2, baz = 3 # 排序为 # bar = 2, baz = 3, foo = 1 # 参数名和参数值链接后,得到拼装字符串(注意:参数名与参数值左右两边不能包含空格) # bar = 2 & baz = 3 & foo = 1 # 将 # pushkey拼接到参数字符尾部进行 # SM3 # 加密,再转化成大写,格式是 # (SM3(key1=value1 & key2=value2 &...& key= pushKey)).upcase # @param reverse: # @param sign_params: # @return: # """ # dict_2 = dict(sorted(sign_params.items(), key=lambda item: item[0], reverse=reverse)) # data_list = [] # for item in dict_2.items(): # if item[0] and item[1]: # data_list.append("{}={}".format(item[0], item[1])) # val = '&'.join(data_list) # push_key = '&key={}'.format(self.pushKey) # val = val + push_key # return sm3_msg(val).upper() # # def get_login_authorization(self): # """ # 获取登录授权 # 注意登录认证Authorization和登录后的请求认证不一样 # 算法:appId+":"+appSecret base64转码生成 前面加Basic # @return: "Basic " + base64_data # """ # voucher = self.appId + ':' + self.appSecret # base64_data = str(base64.b64encode(voucher.encode("utf-8")), "utf-8") # return "Basic " + base64_data # # def get_encode_password(self): # """ # 获取对称加密 # @return: encrypt_pwd # """ # aes = AESencrypt(self.encodeKey.encode('utf-8'), AES.MODE_CBC, self.ivKey.encode('utf-8'), # paddingMode="ZeroPadding", # characterSet='utf-8') # encrypt_pwd = aes.encryptFromString(self.password) # return encrypt_pwd # # def generate_token(self): # """ # 生成令牌 # @return: token # """ # url = self.appUrl + '/auc/oauth/token' # pwd = self.get_encode_password() # body = {'username': self.username, 'password': pwd, 'grant_type': 'password', 'scope': 'server'} # headers = self.headers # headers['Authorization'] = self.get_login_authorization() # response_data = requests.post(url, data=body, headers=headers) # response_data = json.loads(response_data.text) # return response_data['access_token'] # # def refresh_token(self, refresh_token): # """ # 刷新令牌 # @param refresh_token: # @return: # """ # url = self.appUrl + '/auc/oauth/token?grant_type=refresh_token' # body = {'refresh_token': refresh_token, 'grant_type': 'refresh_token'} # headers = self.headers # headers['Authorization'] = self.get_login_authorization() # response_data = requests.post(url, data=body, headers=headers) # return response_data.text # # def business_unify_headers(self): # """ # 业务统一headers # 在请求头中增加key为Authorization,value为"Bearer " + token # @return: headers # """ # token = self.generate_token() # headers = self.headers # headers['Authorization'] = 'Bearer ' + token # return headers # # # 业务api 注意 get与post 请求数据类型不同 post使用:application/json # def verify_device(self, **re_params): # """ # 验证设备 # @param re_params: # @return: # """ # url = self.appUrl + '/platform/api/device/verify-device' # return requests.get(url, params=re_params, headers=self.business_unify_headers()) # # def query_device_status(self, **re_params): # """ # 查询设备状态 # @param re_params: # @return: # """ # url = self.appUrl + '/platform/api/device/device-status' # return requests.get(url, params=re_params, headers=self.business_unify_headers()) # # def update_device_state(self, **re_data): # """ # 修改设备状态 # @param re_data: # @return: # """ # url = self.appUrl + '/platform/api/device/device-state' # headers = self.business_unify_headers() # headers['content-type'] = 'application/json' # return requests.post(url, data=json.dumps(re_data), headers=headers) # # def query_device_usage_history(self, **re_params): # """ # 查询设备用量历史 # @param re_params: # @return: # """ # url = self.appUrl + '/platform/api/usage/device-usage-history' # return requests.get(url, params=re_params, headers=self.business_unify_headers()) # # def query_current_renew_list_usage_details(self, **re_params): # """ # 查询设备当前队列用量详情 # @param re_params: # @return: # """ # url = self.appUrl + '/platform/api/usage/current-renewlist-usage-details' # return requests.get(url, params=re_params, headers=self.business_unify_headers()) # # def get_device_batch_detail(self, **re_data): # """ # 查询设备当前队列用量详情 # @return: # """ # url = self.appUrl + '/platform/api/device/batch-detail' # headers = self.business_unify_headers() # headers['content-type'] = 'application/json' # return requests.post(url, data=json.dumps(re_data), headers=headers) # # def query_package_list(self, **re_params): # """ # 查询套餐列表 # @return: # """ # url = self.appUrl + '/platform/api/package/list' # return requests.get(url, params=re_params, headers=self.business_unify_headers()) # # def query_renewal_list(self, **re_params): # """ # 续费套餐列表 # @param re_params: # @return: # """ # url = self.appUrl + '/platform/api/package/list' # return requests.get(url, params=re_params, headers=self.business_unify_headers()) # # def async_buy_package(self, **re_data): # """ # 查询设备当前队列用量详情 # @return: # """ # url = self.appUrl + '/platform/api/package/async-buy-package' # headers = self.business_unify_headers() # headers['content-type'] = 'application/json' # return requests.post(url, data=json.dumps(re_data), headers=headers) # # # if __name__ == '__main__': # price = '12.13' # print(float(price)) # discount = '6' # dd = Decimal(price) - Decimal(discount) # print(dd.quantize(Decimal('0.00'))) # # # data = {'foo': 1, 'bar': 2, 'baz': 3} # # print(unicom_api.createSign(**data)) # unicom_api = UnicomObjeect() # # result = unicom_api.generate_token() # # result = unicom_api.refresh_token('5d0c0f30-99bd-4f17-9614-3524495b05d4') # params = {'iccids': '89860620180077842020'} # # response = unicom_api.verify_device(**params) # # response = unicom_api.query_device_status(**params) # # response = unicom_api.update_device_state(**params) # # response = unicom_api.query_device_usage_history(**params) # # response = unicom_api.query_current_renew_list_usage_details(**params) # # unicom_api.get_device_batch_detail() # response = unicom_api.query_package_list() # # response = unicom_api.query_renewal_list(**params) # # if response.status_code == 200: # res_dict = json.loads(response.text) # print(res_dict) # response_json = { # "success": True, # "msg": "操作成功", # "code": 0, # "data": { # "iccid": "8986062018007784202", # "completeIccid": "89860620180077842020", # "status": 1 # } # } # print(response_json['data']['iccid']) # print(response.status_code)