123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- # -*- encoding: utf-8 -*-
- """
- @File : UnicomObject.py
- @Time : 2022/6/17 11:03
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- import base64
- import json
- import logging
- import time
- from decimal import Decimal
- import requests
- from Crypto.Cipher import AES
- from Ansjer.config import unicomAppUrl, unicomAppId, unicomAppSecret, unicomTenantId, \
- unicomEncodeKey, unicomIvKey, unicomUserName, unicomPassword, unicomPushKey, SERVER_DOMAIN_SSL, UNICOM_KEY
- from Model.models import UnicomDeviceInfo
- from Object.RedisObject import RedisObject
- from Object.utils import SM3Util
- from Object.utils.SymmetricCryptoUtil import AESencrypt
- from Service.TelecomService import TelecomService
- """
- 联通4Gapi
- 具体参数查看接口文档
- https://www.showdoc.com.cn/unicomJYHapi/8158648460007467
- """
- logger = logging.getLogger('info')
- class UnicomObjeect:
- def __init__(self):
- self.appUrl = unicomAppUrl
- self.appId = unicomAppId
- self.appSecret = unicomAppSecret
- self.tenantId = unicomTenantId
- self.encodeKey = unicomEncodeKey
- self.ivKey = unicomIvKey
- self.username = unicomUserName
- self.password = unicomPassword
- self.pushKey = unicomPushKey
- self.headers = {'Tenant': self.tenantId, 'content-type': 'application/x-www-form-urlencoded'}
- # pip install snowland-smx
- def createSign(self, reverse=False, **sign_params):
- """
- 调用接口(API)时需要对请求参数进行签名(sign)验证,
- 算法:
- 根据参数名称将你的所有请求参数按照字母先后顺序排序: key = value & key = value,对除签名外的所有请求参数按
- key
- 做的升序排列。
- 如:将
- foo = 1, bar = 2, baz = 3
- 排序为
- bar = 2, baz = 3, foo = 1
- 参数名和参数值链接后,得到拼装字符串(注意:参数名与参数值左右两边不能包含空格)
- bar = 2 & baz = 3 & foo = 1
- 将
- pushkey拼接到参数字符尾部进行
- SM3
- 加密,再转化成大写,格式是
- (SM3(key1=value1 & key2=value2 &...& key= pushKey)).upcase
- @param reverse:
- @param sign_params:
- @return:
- """
- dict_2 = dict(sorted(sign_params.items(), key=lambda item: item[0], reverse=reverse))
- data_list = []
- for item in dict_2.items():
- if item[0] and item[1]:
- data_list.append("{}={}".format(item[0], item[1]))
- val = '&'.join(data_list)
- push_key = '&key={}'.format(self.pushKey)
- val = val + push_key
- return SM3Util.Hash_sm3(val).upper()
- def get_login_authorization(self):
- """
- 获取登录授权
- 注意登录认证Authorization和登录后的请求认证不一样
- 算法:appId+":"+appSecret base64转码生成 前面加Basic
- @return: "Basic " + base64_data
- """
- voucher = self.appId + ':' + self.appSecret
- base64_data = str(base64.b64encode(voucher.encode("utf-8")), "utf-8")
- return "Basic " + base64_data
- def get_encode_password(self):
- """
- 获取对称加密AES
- @return: encrypt_pwd
- """
- aes = AESencrypt(self.encodeKey.encode('utf-8'), AES.MODE_CBC, self.ivKey.encode('utf-8'),
- paddingMode="ZeroPadding",
- characterSet='utf-8')
- encrypt_pwd = aes.encryptFromString(self.password)
- return encrypt_pwd
- def generate_token(self):
- """
- 生成令牌
- @return: token
- """
- redis = RedisObject()
- token = redis.get_data(UNICOM_KEY)
- if token:
- return token
- url = self.appUrl + '/auc/oauth/token'
- pwd = self.get_encode_password()
- body = {'username': self.username, 'password': pwd, 'grant_type': 'password', 'scope': 'server'}
- headers = self.headers
- headers['Authorization'] = self.get_login_authorization()
- response_data = requests.post(url, data=body, headers=headers)
- response_data = json.loads(response_data.text)
- token = response_data['access_token']
- expires_in = response_data['expires_in']
- redis.CONN.setnx(UNICOM_KEY, token)
- redis.CONN.expire(UNICOM_KEY, int(expires_in))
- return token
- def refresh_token(self, refresh_token):
- """
- 刷新令牌
- @param refresh_token:
- @return:
- """
- url = self.appUrl + '/auc/oauth/token?grant_type=refresh_token'
- body = {'refresh_token': refresh_token, 'grant_type': 'refresh_token'}
- headers = self.headers
- headers['Authorization'] = self.get_login_authorization()
- response_data = requests.post(url, data=body, headers=headers)
- return response_data.text
- def business_unify_headers(self):
- """
- 业务统一headers
- 在请求头中增加key为Authorization,value为"Bearer " + token
- @return: headers
- """
- token = self.generate_token()
- headers = self.headers
- headers['Authorization'] = 'Bearer ' + token
- return headers
- # 业务api 注意 get与post 请求数据类型不同 post使用:application/json
- def verify_device(self, **re_params):
- """
- 验证设备
- @param re_params:
- @return:
- """
- re_params['appId'] = self.appId
- url = self.appUrl + '/cop-platform/api/device/verify-device'
- return requests.get(url, params=re_params, headers=self.business_unify_headers())
- def query_device_status(self, **re_params):
- """
- 查询设备状态
- @param re_params:
- @return:
- """
- url = self.appUrl + '/cop-platform/api/device/detail'
- re_params['appId'] = self.appId
- headers = self.business_unify_headers()
- headers['content-type'] = 'application/json'
- return requests.get(url, params=re_params, headers=headers)
- def update_device_state(self, **re_data):
- """
- 修改设备状态
- @param re_data:
- @return:
- """
- url = self.appUrl + '/cop-platform/api/device/async-device-state'
- headers = self.business_unify_headers()
- headers['content-type'] = 'application/json'
- re_data['appId'] = self.appId
- re_data['callbackUrl'] = SERVER_DOMAIN_SSL + 'unicom/api/device-status-change'
- return requests.post(url, data=json.dumps(re_data), headers=headers)
- def query_device_usage_history(self, **re_params):
- """
- 查询设备用量历史
- @param re_params:
- @return:
- """
- headers = self.business_unify_headers()
- re_params['appId'] = self.appId
- url = self.appUrl + '/cop-platform/api/usage/device-usage-history'
- return requests.get(url, params=re_params, headers=headers)
- def query_current_renew_list_usage_details(self, **re_params):
- """
- 查询设备当前队列用量详情
- @param re_params:
- @return:
- """
- url = self.appUrl + '/cop-platform/api/device/current-package-usage-details'
- re_params['appId'] = self.appId
- return requests.get(url, params=re_params, headers=self.business_unify_headers())
- def get_device_batch_detail(self, **re_data):
- """
- 查询设备当前队列用量详情
- @return:
- """
- url = self.appUrl + '/platform/api/device/batch-detail'
- headers = self.business_unify_headers()
- headers['content-type'] = 'application/json'
- return requests.post(url, data=json.dumps(re_data), headers=headers)
- def query_package_list(self, **re_params):
- """
- 查询套餐列表
- @return:
- """
- url = self.appUrl + '/platform/api/package/list'
- return requests.get(url, params=re_params, headers=self.business_unify_headers())
- def query_renewal_list(self, **re_params):
- """
- 续费套餐列表
- @param re_params:
- @return:
- """
- url = self.appUrl + '/platform/api/package/list'
- return requests.get(url, params=re_params, headers=self.business_unify_headers())
- def async_buy_package(self, **re_data):
- """
- 查询设备当前队列用量详情
- @return:
- """
- url = self.appUrl + '/platform/api/package/async-buy-package'
- headers = self.business_unify_headers()
- headers['content-type'] = 'application/json'
- return requests.post(url, data=json.dumps(re_data), headers=headers)
- @staticmethod
- def get_text_dict(result):
- """
- 响应结果转字典
- @param result:
- @return:
- """
- if result.status_code == 200:
- return json.loads(result.text)
- return None
- @staticmethod
- def get_flow_total_usage(key, expire=600, **usage_data):
- """
- 设备当前队列用量详情(实现缓存)
- @param key: 缓存key
- @param expire: 失效时间
- @param usage_data: 查询参数
- @return: 返回结果dict
- """
- redis = RedisObject()
- sim_flow_used_total = redis.get_data(key)
- if sim_flow_used_total:
- return Decimal(sim_flow_used_total).quantize(Decimal('0.00'))
- else:
- # API查询设备信息其中包含周期流量使用
- flow_usage_details = UnicomObjeect().query_device_status(**usage_data)
- flow_usage_details = UnicomObjeect().get_text_dict(flow_usage_details)
- if flow_usage_details and flow_usage_details.get('success'):
- cycle_total = flow_usage_details['data'].get('simCycleUsedFlow', 0)
- else:
- cycle_total = 0
- # 查询SIM卡信息
- sim_qs = UnicomDeviceInfo.objects.filter(iccid=usage_data['iccid'])
- if not sim_qs:
- return cycle_total
- cycle_total = Decimal(cycle_total).quantize(Decimal('0.00'))
- sim_vo = sim_qs.first()
- n_time = int(time.time())
- # 判断数据库周期流量用量 是否大于API查询出来的周期用量 如果是则判定进入了下一个周期
- if sim_vo.sim_cycle_used_flow != 0 and sim_vo.sim_cycle_used_flow > cycle_total:
- sim_used_flow = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
- sim_qs.update(
- sim_used_flow=sim_used_flow,
- sim_cycle_used_flow=cycle_total,
- updated_time=n_time
- )
- # 队列用量历史总量 + 上一个周期流量 + 当前周期流量 = 总消耗流量
- sim_flow_used_total = sim_used_flow + cycle_total
- elif cycle_total > sim_vo.sim_cycle_used_flow: # API周期用量大于当前数据库用量则更新记录
- sim_qs.update(sim_cycle_used_flow=cycle_total, updated_time=n_time)
- # 队列用量历史总量 + 当前周期流量 = 总消耗流量
- sim_flow_used_total = sim_vo.sim_used_flow + cycle_total
- else:
- sim_flow_used_total = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
- redis.CONN.setnx(key, str(sim_flow_used_total))
- redis.CONN.expire(key, expire)
- return sim_flow_used_total
- @staticmethod
- def get_flow_usage_total(iccid, card_type=0):
- """
- 查询当前卡号历史总用量,默认查询珠海联通卡总使用量,单位MB
- @param card_type: 卡类型,0:珠海联通,3:鼎芯电信
- @param iccid: 20位卡号
- @return: flow_total_usage 当前套餐总已使用流量
- """
- flow_key = 'ASJ:UNICOM:FLOW:{}'
- usage_data = {'iccid': iccid}
- expire_time = 60 * 10 + 60
- if card_type == 3: # 鼎芯电信
- return TelecomService().query_total_usage_by_access_number(iccid, flow_key.format(iccid), expire_time)
- # 珠海联通
- return UnicomObjeect().get_flow_total_usage(flow_key.format(iccid), expire_time, **usage_data)
- @staticmethod
- def change_device_to_activate(iccid, card_type=0, access_number='', reason=''):
- """
- 根据iccid判断是否激活,未激活则修改为激活状态
- @param reason: 原因
- @param access_number: 11位接入号码
- @param card_type: 0:联通,1:鼎芯电信
- @param iccid: 20位ICCID
- @return:
- """
- if card_type == 3 and access_number:
- return TelecomService.update_access_number_network(iccid, access_number, 'DEL', reason)
- card_info = UnicomDeviceInfo.objects.filter(iccid=iccid).values('card_type', 'access_number')
- if not card_info.exists():
- return None
- if card_info[0]['card_type'] == 3:
- TelecomService.update_access_number_network(iccid, card_info[0]['access_number'], 'DEL', reason)
- return True
- if iccid:
- re_data = {'iccid': iccid}
- result = UnicomObjeect().query_device_status(**re_data)
- res_dict = UnicomObjeect().get_text_dict(result)
- # 状态不等于1(激活)时进行激活 1:激活;3:停用
- if res_dict['data']['status'] != 1:
- re_data = {"iccid": iccid, "status": 1}
- UnicomObjeect().update_device_state(**re_data)
- return True
- return None
- @staticmethod
- def change_device_to_disable(iccid, card_type=0, access_number='', reason=''):
- """
- 修改设备为停用,并查看是否修改成功
- @param reason: 原因
- @param access_number: 11位接入号码
- @param card_type: 0:联通,1:鼎芯电信
- @param iccid: 20位ICCID
- @return:
- """
- if card_type == 3 and access_number:
- return TelecomService.update_access_number_network(iccid, access_number, 'ADD', reason)
- card_info = UnicomDeviceInfo.objects.filter(iccid=iccid).values('card_type', 'access_number')
- if not card_info.exists():
- return None
- if card_info[0]['card_type'] == 3:
- TelecomService.update_access_number_network(iccid, card_info[0]['access_number'], 'ADD', reason)
- return True
- if iccid:
- re_data = {"iccid": iccid, "status": 3}
- response = UnicomObjeect().update_device_state(**re_data)
- logger.info('停用iccid响应结果:{}'.format(response.text))
- # 查询是否停用成功
- re_data.pop('status')
- result = UnicomObjeect().query_device_status(**re_data)
- res_dict = UnicomObjeect().get_text_dict(result)
- logger.info('查询iccid状态:{}'.format(res_dict))
- if res_dict['data']['status'] != 3:
- re_data['status'] = 3
- response = UnicomObjeect().update_device_state(**re_data)
- logger.info('再次停卡:{}'.format(response.text))
- return True
- return None
- @staticmethod
- def current_sim_traffic_usage_details(icc_id):
- """
- 当前sim卡流量使用详情
- @param icc_id:20位数字iccid
- @return: traffic
- """
- try:
- redis = RedisObject()
- traffic_key = 'ASJ:UNICOM:FLOW:{}'.format(icc_id)
- traffic_sys = 'ASJ:SIM:TRAFFIC:{}'.format(icc_id)
- traffic_val = redis.get_data(traffic_key)
- if traffic_val:
- traffic_dict = json.loads(traffic_val)
- redis.set_data(key=traffic_sys, val=traffic_val, expire=60 * 60 * 24)
- else:
- traffic_val = redis.get_data(traffic_sys)
- if not traffic_val:
- return 0
- traffic_dict = json.loads(traffic_val)
- return traffic_dict['data']['flowTotalUsage']
- except Exception as e:
- meg = '异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))
- return meg
|