# -*- encoding: utf-8 -*- """ @File : UnicomObject.py @Time : 2022/6/17 11:03 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import base64 import json import logging import time from decimal import Decimal import requests from Crypto.Cipher import AES from Ansjer.config import unicomAppUrl, unicomAppId, unicomAppSecret, unicomTenantId, \ unicomEncodeKey, unicomIvKey, unicomUserName, unicomPassword, unicomPushKey, SERVER_DOMAIN_SSL, UNICOM_KEY from Model.models import UnicomDeviceInfo from Object.RedisObject import RedisObject from Object.utils import SM3Util from Object.utils.SymmetricCryptoUtil import AESencrypt """ 联通4Gapi 具体参数查看接口文档 https://www.showdoc.com.cn/unicomJYHapi/8158648460007467 """ logger = logging.getLogger('info') class UnicomObjeect: def __init__(self): self.appUrl = unicomAppUrl self.appId = unicomAppId self.appSecret = unicomAppSecret self.tenantId = unicomTenantId self.encodeKey = unicomEncodeKey self.ivKey = unicomIvKey self.username = unicomUserName self.password = unicomPassword self.pushKey = unicomPushKey self.headers = {'Tenant': self.tenantId, 'content-type': 'application/x-www-form-urlencoded'} # pip install snowland-smx def createSign(self, reverse=False, **sign_params): """ 调用接口(API)时需要对请求参数进行签名(sign)验证, 算法: 根据参数名称将你的所有请求参数按照字母先后顺序排序: key = value & key = value,对除签名外的所有请求参数按 key 做的升序排列。 如:将 foo = 1, bar = 2, baz = 3 排序为 bar = 2, baz = 3, foo = 1 参数名和参数值链接后,得到拼装字符串(注意:参数名与参数值左右两边不能包含空格) bar = 2 & baz = 3 & foo = 1 将 pushkey拼接到参数字符尾部进行 SM3 加密,再转化成大写,格式是 (SM3(key1=value1 & key2=value2 &...& key= pushKey)).upcase @param reverse: @param sign_params: @return: """ dict_2 = dict(sorted(sign_params.items(), key=lambda item: item[0], reverse=reverse)) data_list = [] for item in dict_2.items(): if item[0] and item[1]: data_list.append("{}={}".format(item[0], item[1])) val = '&'.join(data_list) push_key = '&key={}'.format(self.pushKey) val = val + push_key return SM3Util.Hash_sm3(val).upper() def get_login_authorization(self): """ 获取登录授权 注意登录认证Authorization和登录后的请求认证不一样 算法:appId+":"+appSecret base64转码生成 前面加Basic @return: "Basic " + base64_data """ voucher = self.appId + ':' + self.appSecret base64_data = str(base64.b64encode(voucher.encode("utf-8")), "utf-8") return "Basic " + base64_data def get_encode_password(self): """ 获取对称加密AES @return: encrypt_pwd """ aes = AESencrypt(self.encodeKey.encode('utf-8'), AES.MODE_CBC, self.ivKey.encode('utf-8'), paddingMode="ZeroPadding", characterSet='utf-8') encrypt_pwd = aes.encryptFromString(self.password) return encrypt_pwd def generate_token(self): """ 生成令牌 @return: token """ redis = RedisObject() token = redis.get_data(UNICOM_KEY) if token: return token url = self.appUrl + '/auc/oauth/token' pwd = self.get_encode_password() body = {'username': self.username, 'password': pwd, 'grant_type': 'password', 'scope': 'server'} headers = self.headers headers['Authorization'] = self.get_login_authorization() response_data = requests.post(url, data=body, headers=headers) response_data = json.loads(response_data.text) token = response_data['access_token'] expires_in = response_data['expires_in'] redis.CONN.setnx(UNICOM_KEY, token) redis.CONN.expire(UNICOM_KEY, int(expires_in)) return token def refresh_token(self, refresh_token): """ 刷新令牌 @param refresh_token: @return: """ url = self.appUrl + '/auc/oauth/token?grant_type=refresh_token' body = {'refresh_token': refresh_token, 'grant_type': 'refresh_token'} headers = self.headers headers['Authorization'] = self.get_login_authorization() response_data = requests.post(url, data=body, headers=headers) return response_data.text def business_unify_headers(self): """ 业务统一headers 在请求头中增加key为Authorization,value为"Bearer " + token @return: headers """ token = self.generate_token() headers = self.headers headers['Authorization'] = 'Bearer ' + token return headers # 业务api 注意 get与post 请求数据类型不同 post使用:application/json def verify_device(self, **re_params): """ 验证设备 @param re_params: @return: """ re_params['appId'] = self.appId url = self.appUrl + '/cop-platform/api/device/verify-device' return requests.get(url, params=re_params, headers=self.business_unify_headers()) def query_device_status(self, **re_params): """ 查询设备状态 @param re_params: @return: """ url = self.appUrl + '/cop-platform/api/device/detail' re_params['appId'] = self.appId headers = self.business_unify_headers() headers['content-type'] = 'application/json' return requests.get(url, params=re_params, headers=headers) def update_device_state(self, **re_data): """ 修改设备状态 @param re_data: @return: """ url = self.appUrl + '/cop-platform/api/device/async-device-state' headers = self.business_unify_headers() headers['content-type'] = 'application/json' re_data['appId'] = self.appId re_data['callbackUrl'] = SERVER_DOMAIN_SSL + 'unicom/api/device-status-change' return requests.post(url, data=json.dumps(re_data), headers=headers) def query_device_usage_history(self, **re_params): """ 查询设备用量历史 @param re_params: @return: """ headers = self.business_unify_headers() re_params['appId'] = self.appId url = self.appUrl + '/cop-platform/api/usage/device-usage-history' return requests.get(url, params=re_params, headers=headers) def query_current_renew_list_usage_details(self, **re_params): """ 查询设备当前队列用量详情 @param re_params: @return: """ url = self.appUrl + '/cop-platform/api/device/current-package-usage-details' re_params['appId'] = self.appId return requests.get(url, params=re_params, headers=self.business_unify_headers()) def get_device_batch_detail(self, **re_data): """ 查询设备当前队列用量详情 @return: """ url = self.appUrl + '/platform/api/device/batch-detail' headers = self.business_unify_headers() headers['content-type'] = 'application/json' return requests.post(url, data=json.dumps(re_data), headers=headers) def query_package_list(self, **re_params): """ 查询套餐列表 @return: """ url = self.appUrl + '/platform/api/package/list' return requests.get(url, params=re_params, headers=self.business_unify_headers()) def query_renewal_list(self, **re_params): """ 续费套餐列表 @param re_params: @return: """ url = self.appUrl + '/platform/api/package/list' return requests.get(url, params=re_params, headers=self.business_unify_headers()) def async_buy_package(self, **re_data): """ 查询设备当前队列用量详情 @return: """ url = self.appUrl + '/platform/api/package/async-buy-package' headers = self.business_unify_headers() headers['content-type'] = 'application/json' return requests.post(url, data=json.dumps(re_data), headers=headers) @staticmethod def get_text_dict(result): """ 响应结果转字典 @param result: @return: """ if result.status_code == 200: return json.loads(result.text) return None @staticmethod def get_flow_total_usage(key, expire=600, **usage_data): """ 设备当前队列用量详情(实现缓存) @param key: 缓存key @param expire: 失效时间 @param usage_data: 查询参数 @return: 返回结果dict """ redis = RedisObject() sim_flow_used_total = redis.get_data(key) if sim_flow_used_total: return Decimal(sim_flow_used_total).quantize(Decimal('0.00')) else: # API查询设备信息其中包含周期流量使用 flow_usage_details = UnicomObjeect().query_device_status(**usage_data) flow_usage_details = UnicomObjeect().get_text_dict(flow_usage_details) if flow_usage_details and flow_usage_details.get('success'): cycle_total = flow_usage_details['data'].get('simCycleUsedFlow', 0) else: cycle_total = 0 # 查询SIM卡信息 sim_qs = UnicomDeviceInfo.objects.filter(iccid=usage_data['iccid']) if not sim_qs: return cycle_total cycle_total = Decimal(cycle_total).quantize(Decimal('0.00')) sim_vo = sim_qs.first() n_time = int(time.time()) # 判断数据库周期流量用量 是否大于API查询出来的周期用量 如果是则判定进入了下一个周期 if sim_vo.sim_cycle_used_flow != 0 and sim_vo.sim_cycle_used_flow > cycle_total: sim_used_flow = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow sim_qs.update( sim_used_flow=sim_used_flow, sim_cycle_used_flow=cycle_total, updated_time=n_time ) # 队列用量历史总量 + 上一个周期流量 + 当前周期流量 = 总消耗流量 sim_flow_used_total = sim_used_flow + cycle_total elif cycle_total > sim_vo.sim_cycle_used_flow: # API周期用量大于当前数据库用量则更新记录 sim_qs.update(sim_cycle_used_flow=cycle_total, updated_time=n_time) # 队列用量历史总量 + 当前周期流量 = 总消耗流量 sim_flow_used_total = sim_vo.sim_used_flow + cycle_total else: sim_flow_used_total = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow redis.CONN.setnx(key, str(sim_flow_used_total)) redis.CONN.expire(key, expire) return sim_flow_used_total @staticmethod def get_flow_usage_total(iccid): """ 获取实时当前套餐队列总已用流量 @param iccid: 联通id @return: flow_total_usage 当前套餐总已使用流量 """ flow_key = 'ASJ:UNICOM:FLOW:{}' usage_data = {'iccid': iccid} expire_time = 60 * 10 + 60 return UnicomObjeect().get_flow_total_usage(flow_key.format(iccid), expire_time, **usage_data) @staticmethod def change_device_to_activate(iccid): """ 根据iccid判断是否激活,未激活则修改为激活状态 @param iccid: @return: """ if iccid: re_data = {'iccid': iccid} result = UnicomObjeect().query_device_status(**re_data) res_dict = UnicomObjeect().get_text_dict(result) # 状态不等于1(激活)时进行激活 1:激活;3:停用 if res_dict['data']['status'] != 1: re_data = {"iccid": iccid, "status": 1} UnicomObjeect().update_device_state(**re_data) return True return None @staticmethod def change_device_to_disable(iccid): """ 修改设备为停用,并查看是否修改成功 @param iccid: @return: """ if iccid: re_data = {"iccid": iccid, "status": 3} response = UnicomObjeect().update_device_state(**re_data) logger.info('停用iccid响应结果:{}'.format(response.text)) # 查询是否停用成功 re_data.pop('status') result = UnicomObjeect().query_device_status(**re_data) res_dict = UnicomObjeect().get_text_dict(result) logger.info('查询iccid状态:{}'.format(res_dict)) if res_dict['data']['status'] != 3: re_data['status'] = 3 response = UnicomObjeect().update_device_state(**re_data) logger.info('再次停卡:{}'.format(response.text)) return True return None @staticmethod def current_sim_traffic_usage_details(icc_id): """ 当前sim卡流量使用详情 @param icc_id:20位数字iccid @return: traffic """ try: redis = RedisObject() traffic_key = 'ASJ:UNICOM:FLOW:{}'.format(icc_id) traffic_sys = 'ASJ:SIM:TRAFFIC:{}'.format(icc_id) traffic_val = redis.get_data(traffic_key) if traffic_val: traffic_dict = json.loads(traffic_val) redis.set_data(key=traffic_sys, val=traffic_val, expire=60 * 60 * 24) else: traffic_val = redis.get_data(traffic_sys) if not traffic_val: return 0 traffic_dict = json.loads(traffic_val) return traffic_dict['data']['flowTotalUsage'] except Exception as e: meg = '异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)) return meg