| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 | 
							- # -*- encoding: utf-8 -*-
 
- """
 
- @File    : UnicomObject.py
 
- @Time    : 2022/6/17 11:03
 
- @Author  : stephen
 
- @Email   : zhangdongming@asj6.wecom.work
 
- @Software: PyCharm
 
- """
 
- import base64
 
- import datetime
 
- import json
 
- from decimal import Decimal
 
- import requests
 
- from Crypto.Cipher import AES
 
- from Ansjer.config import unicomAppUrl, unicomAppId, unicomAppSecret, unicomTenantId, \
 
-     unicomEncodeKey, unicomIvKey, unicomUserName, unicomPassword, unicomPushKey
 
- from Object.RedisObject import RedisObject
 
- from Object.utils import SM3Util
 
- from Object.utils.SymmetricCryptoUtil import AESencrypt
 
- """
 
- 联通4Gapi
 
- 具体参数查看接口文档
 
- https://www.showdoc.com.cn/unicomJYHapi/8158648460007467
 
- """
 
- class UnicomObjeect:
 
-     def __init__(self):
 
-         self.appUrl = unicomAppUrl
 
-         self.appId = unicomAppId
 
-         self.appSecret = unicomAppSecret
 
-         self.tenantId = unicomTenantId
 
-         self.encodeKey = unicomEncodeKey
 
-         self.ivKey = unicomIvKey
 
-         self.username = unicomUserName
 
-         self.password = unicomPassword
 
-         self.pushKey = unicomPushKey
 
-         self.headers = {'Tenant': self.tenantId, 'content-type': 'application/x-www-form-urlencoded'}
 
-     # pip install snowland-smx
 
-     def createSign(self, reverse=False, **sign_params):
 
-         """
 
-         调用接口(API)时需要对请求参数进行签名(sign)验证,
 
-         算法:
 
-         根据参数名称将你的所有请求参数按照字母先后顺序排序: key = value & key = value,对除签名外的所有请求参数按
 
-         key
 
-         做的升序排列。
 
-         如:将
 
-         foo = 1, bar = 2, baz = 3
 
-         排序为
 
-         bar = 2, baz = 3, foo = 1
 
-         参数名和参数值链接后,得到拼装字符串(注意:参数名与参数值左右两边不能包含空格)
 
-         bar = 2 & baz = 3 & foo = 1
 
-         将
 
-         pushkey拼接到参数字符尾部进行
 
-         SM3
 
-         加密,再转化成大写,格式是
 
-         (SM3(key1=value1 & key2=value2 &...& key= pushKey)).upcase
 
-         @param reverse:
 
-         @param sign_params:
 
-         @return:
 
-         """
 
-         dict_2 = dict(sorted(sign_params.items(), key=lambda item: item[0], reverse=reverse))
 
-         data_list = []
 
-         for item in dict_2.items():
 
-             if item[0] and item[1]:
 
-                 data_list.append("{}={}".format(item[0], item[1]))
 
-         val = '&'.join(data_list)
 
-         push_key = '&key={}'.format(self.pushKey)
 
-         val = val + push_key
 
-         return SM3Util.Hash_sm3(val).upper()
 
-     def get_login_authorization(self):
 
-         """
 
-         获取登录授权
 
-         注意登录认证Authorization和登录后的请求认证不一样
 
-         算法:appId+":"+appSecret base64转码生成 前面加Basic
 
-         @return: "Basic " + base64_data
 
-         """
 
-         voucher = self.appId + ':' + self.appSecret
 
-         base64_data = str(base64.b64encode(voucher.encode("utf-8")), "utf-8")
 
-         return "Basic " + base64_data
 
-     def get_encode_password(self):
 
-         """
 
-         获取对称加密
 
-         @return: encrypt_pwd
 
-         """
 
-         aes = AESencrypt(self.encodeKey.encode('utf-8'), AES.MODE_CBC, self.ivKey.encode('utf-8'),
 
-                          paddingMode="ZeroPadding",
 
-                          characterSet='utf-8')
 
-         encrypt_pwd = aes.encryptFromString(self.password)
 
-         return encrypt_pwd
 
-     def generate_token(self):
 
-         """
 
-         生成令牌
 
-         @return: token
 
-         """
 
-         url = self.appUrl + '/auc/oauth/token'
 
-         pwd = self.get_encode_password()
 
-         body = {'username': self.username, 'password': pwd, 'grant_type': 'password', 'scope': 'server'}
 
-         headers = self.headers
 
-         headers['Authorization'] = self.get_login_authorization()
 
-         response_data = requests.post(url, data=body, headers=headers)
 
-         response_data = json.loads(response_data.text)
 
-         return response_data['access_token']
 
-     def refresh_token(self, refresh_token):
 
-         """
 
-         刷新令牌
 
-         @param refresh_token:
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/auc/oauth/token?grant_type=refresh_token'
 
-         body = {'refresh_token': refresh_token, 'grant_type': 'refresh_token'}
 
-         headers = self.headers
 
-         headers['Authorization'] = self.get_login_authorization()
 
-         response_data = requests.post(url, data=body, headers=headers)
 
-         return response_data.text
 
-     def business_unify_headers(self):
 
-         """
 
-         业务统一headers
 
-         在请求头中增加key为Authorization,value为"Bearer " + token
 
-         @return: headers
 
-         """
 
-         token = self.generate_token()
 
-         headers = self.headers
 
-         headers['Authorization'] = 'Bearer ' + token
 
-         return headers
 
-     # 业务api  注意 get与post 请求数据类型不同 post使用:application/json
 
-     def verify_device(self, **re_params):
 
-         """
 
-         验证设备
 
-         @param re_params:
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/platform/api/device/verify-device'
 
-         return requests.get(url, params=re_params, headers=self.business_unify_headers())
 
-     def query_device_status(self, **re_params):
 
-         """
 
-         查询设备状态
 
-         @param re_params:
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/platform/api/device/device-status'
 
-         return requests.get(url, params=re_params, headers=self.business_unify_headers())
 
-     def update_device_state(self, **re_data):
 
-         """
 
-         修改设备状态
 
-         @param re_data:
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/platform/api/device/device-state'
 
-         headers = self.business_unify_headers()
 
-         headers['content-type'] = 'application/json'
 
-         return requests.post(url, data=json.dumps(re_data), headers=headers)
 
-     def query_device_usage_history(self, **re_params):
 
-         """
 
-         查询设备用量历史
 
-         @param re_params:
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/platform/api/usage/device-usage-history'
 
-         return requests.get(url, params=re_params, headers=self.business_unify_headers())
 
-     def query_current_renew_list_usage_details(self, **re_params):
 
-         """
 
-         查询设备当前队列用量详情
 
-         @param re_params:
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/platform/api/usage/current-renewlist-usage-details'
 
-         return requests.get(url, params=re_params, headers=self.business_unify_headers())
 
-     def get_device_batch_detail(self, **re_data):
 
-         """
 
-         查询设备当前队列用量详情
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/platform/api/device/batch-detail'
 
-         headers = self.business_unify_headers()
 
-         headers['content-type'] = 'application/json'
 
-         return requests.post(url, data=json.dumps(re_data), headers=headers)
 
-     def query_package_list(self, **re_params):
 
-         """
 
-         查询套餐列表
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/platform/api/package/list'
 
-         return requests.get(url, params=re_params, headers=self.business_unify_headers())
 
-     def query_renewal_list(self, **re_params):
 
-         """
 
-         续费套餐列表
 
-         @param re_params:
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/platform/api/package/list'
 
-         return requests.get(url, params=re_params, headers=self.business_unify_headers())
 
-     def async_buy_package(self, **re_data):
 
-         """
 
-         查询设备当前队列用量详情
 
-         @return:
 
-         """
 
-         url = self.appUrl + '/platform/api/package/async-buy-package'
 
-         headers = self.business_unify_headers()
 
-         headers['content-type'] = 'application/json'
 
-         return requests.post(url, data=json.dumps(re_data), headers=headers)
 
-     @staticmethod
 
-     def get_text_dict(result):
 
-         """
 
-         响应结果转字典
 
-         @param result:
 
-         @return:
 
-         """
 
-         if result.status_code == 200:
 
-             return json.loads(result.text)
 
-         return None
 
-     @staticmethod
 
-     def unicom_flow_usage_cache(key, expire=0, **usage_data):
 
-         """
 
-         流量用量历史优先查询缓存
 
-         @param key: 缓存key
 
-         @param expire: 失效时间
 
-         @param usage_data: 查询参数
 
-         @return: 返回结果dict
 
-         """
 
-         redis = RedisObject()
 
-         usage_history = redis.get_data(key)
 
-         if usage_history:
 
-             usage_history = json.loads(usage_history)
 
-         else:
 
-             usage_history = UnicomObjeect().query_device_usage_history(**usage_data)
 
-             usage_history = UnicomObjeect().get_text_dict(usage_history)
 
-             redis.set_data(key=key, val=json.dumps(usage_history), expire=expire)
 
-         return usage_history
 
-     @staticmethod
 
-     def get_flow_usage_total(usage_year, usage_month, iccid):
 
-         """
 
-         查询当月用量历史
 
-         @param usage_year: 使用年
 
-         @param usage_month: 使用月
 
-         @param iccid: 联通id
 
-         @return: flow_total_usage 当月实际使用流量
 
-         """
 
-         flow_key = 'ASJ:UNICOM:FLOW:{}'
 
-         usage_data = {'iccid': iccid}
 
-         usage_history = UnicomObjeect().unicom_flow_usage_cache(flow_key.format(iccid), (60 * 10 + 3), **usage_data)
 
-         # 当月实际总使用流量
 
-         flow_total_usage = 0
 
-         if usage_history and usage_history['success']:
 
-             device_usage_history_list = usage_history['data']['deviceUsageHistory']
 
-             if device_usage_history_list:
 
-                 for item in device_usage_history_list:
 
-                     if item['year'] != usage_year or item['month'] != usage_month:
 
-                         continue
 
-                     flow_total_usage += item['flowTotalUsage']
 
-         return flow_total_usage
 
-     @staticmethod
 
-     def change_device_to_activate(iccid):
 
-         """
 
-         根据iccid判断是否激活,未激活则修改为激活状态
 
-         @param iccid:
 
-         @return:
 
-         """
 
-         if iccid:
 
-             re_data = {'iccid': iccid}
 
-             result = UnicomObjeect().query_device_status(**re_data)
 
-             res_dict = UnicomObjeect().get_text_dict(result)
 
-             # 状态不等于1(激活)时进行激活 1:激活;2:停用
 
-             if res_dict['data']['status'] != 1:
 
-                 re_data = {"iccid": iccid, "status": 1}
 
-                 UnicomObjeect().update_device_state(**re_data)
 
-             return True
 
-         return None
 
-     @staticmethod
 
-     def change_device_to_disable(iccid):
 
-         """
 
-         修改设备为停用,并查看是否修改成功
 
-         @param iccid:
 
-         @return:
 
-         """
 
-         if iccid:
 
-             re_data = {"iccid": iccid, "status": 2}
 
-             UnicomObjeect().update_device_state(**re_data)
 
-             # 查询是否停用成功
 
-             re_data.pop('status')
 
-             result = UnicomObjeect().query_device_status(**re_data)
 
-             res_dict = UnicomObjeect().get_text_dict(result)
 
-             if res_dict['data']['status'] != 3:
 
-                 UnicomObjeect().update_device_state(**re_data)
 
-             return True
 
-         return None
 
- if __name__ == '__main__':
 
-     today = datetime.datetime.today()
 
-     year = today.year
 
-     month = today.month
 
-     print(year)
 
-     print(month)
 
-     print(type(year))
 
-     if not '':
 
-         print('为空')
 
-     price = '12.13'
 
-     print(float(price))
 
-     discount = '6'
 
-     dd = Decimal(price) - Decimal(discount)
 
-     print(dd.quantize(Decimal('0.00')))
 
-     unicom_api = UnicomObjeect()
 
-     data = {'foo': 1, 'bar': 2, 'baz': 3}
 
-     print(unicom_api.createSign(**data))
 
-     # result = unicom_api.generate_token()
 
-     # result = unicom_api.refresh_token('5d0c0f30-99bd-4f17-9614-3524495b05d4')
 
-     params = {'iccid': '89860621330065433774', 'status': 2}
 
-     # response = unicom_api.verify_device(**params)
 
-     # response = unicom_api.query_device_status(**params)
 
-     response = unicom_api.update_device_state(**params)
 
-     # response = unicom_api.query_device_usage_history(**params)
 
-     # response = unicom_api.query_current_renew_list_usage_details(**params)
 
-     # unicom_api.get_device_batch_detail()
 
-     # response = unicom_api.query_package_list(**params)
 
-     # response = unicom_api.query_renewal_list(**params)
 
-     if response.status_code == 200:
 
-         res = json.loads(response.text)
 
-         print(res)
 
-     response_json = {
 
-         "success": True,
 
-         "msg": "操作成功",
 
-         "code": 0,
 
-         "data": {
 
-             "iccid": "8986062018007784202",
 
-             "completeIccid": "89860620180077842020",
 
-             "status": 1
 
-         }
 
-     }
 
-     print(response_json['data']['iccid'])
 
-     print(response.status_code)
 
 
  |