| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 | # -*- encoding: utf-8 -*-"""@File    : UnicomObject.py@Time    : 2022/6/17 11:03@Author  : stephen@Email   : zhangdongming@asj6.wecom.work@Software: PyCharm"""import base64import datetimeimport jsonfrom decimal import Decimalimport requestsfrom Crypto.Cipher import AESfrom Ansjer.config import unicomAppUrl, unicomAppId, unicomAppSecret, unicomTenantId, \    unicomEncodeKey, unicomIvKey, unicomUserName, unicomPassword, unicomPushKeyfrom Object.RedisObject import RedisObjectfrom Object.utils import SM3Utilfrom Object.utils.SymmetricCryptoUtil import AESencrypt"""联通4Gapi具体参数查看接口文档https://www.showdoc.com.cn/unicomJYHapi/8158648460007467"""class UnicomObjeect:    def __init__(self):        self.appUrl = unicomAppUrl        self.appId = unicomAppId        self.appSecret = unicomAppSecret        self.tenantId = unicomTenantId        self.encodeKey = unicomEncodeKey        self.ivKey = unicomIvKey        self.username = unicomUserName        self.password = unicomPassword        self.pushKey = unicomPushKey        self.headers = {'Tenant': self.tenantId, 'content-type': 'application/x-www-form-urlencoded'}    # pip install snowland-smx    def createSign(self, reverse=False, **sign_params):        """        调用接口(API)时需要对请求参数进行签名(sign)验证,        算法:        根据参数名称将你的所有请求参数按照字母先后顺序排序: key = value & key = value,对除签名外的所有请求参数按        key        做的升序排列。        如:将        foo = 1, bar = 2, baz = 3        排序为        bar = 2, baz = 3, foo = 1        参数名和参数值链接后,得到拼装字符串(注意:参数名与参数值左右两边不能包含空格)        bar = 2 & baz = 3 & foo = 1        将        pushkey拼接到参数字符尾部进行        SM3        加密,再转化成大写,格式是        (SM3(key1=value1 & key2=value2 &...& key= pushKey)).upcase        @param reverse:        @param sign_params:        @return:        """        dict_2 = dict(sorted(sign_params.items(), key=lambda item: item[0], reverse=reverse))        data_list = []        for item in dict_2.items():            if item[0] and item[1]:                data_list.append("{}={}".format(item[0], item[1]))        val = '&'.join(data_list)        push_key = '&key={}'.format(self.pushKey)        val = val + push_key        return SM3Util.Hash_sm3(val).upper()    def get_login_authorization(self):        """        获取登录授权        注意登录认证Authorization和登录后的请求认证不一样        算法:appId+":"+appSecret base64转码生成 前面加Basic        @return: "Basic " + base64_data        """        voucher = self.appId + ':' + self.appSecret        base64_data = str(base64.b64encode(voucher.encode("utf-8")), "utf-8")        return "Basic " + base64_data    def get_encode_password(self):        """        获取对称加密        @return: encrypt_pwd        """        aes = AESencrypt(self.encodeKey.encode('utf-8'), AES.MODE_CBC, self.ivKey.encode('utf-8'),                         paddingMode="ZeroPadding",                         characterSet='utf-8')        encrypt_pwd = aes.encryptFromString(self.password)        return encrypt_pwd    def generate_token(self):        """        生成令牌        @return: token        """        url = self.appUrl + '/auc/oauth/token'        pwd = self.get_encode_password()        body = {'username': self.username, 'password': pwd, 'grant_type': 'password', 'scope': 'server'}        headers = self.headers        headers['Authorization'] = self.get_login_authorization()        response_data = requests.post(url, data=body, headers=headers)        response_data = json.loads(response_data.text)        return response_data['access_token']    def refresh_token(self, refresh_token):        """        刷新令牌        @param refresh_token:        @return:        """        url = self.appUrl + '/auc/oauth/token?grant_type=refresh_token'        body = {'refresh_token': refresh_token, 'grant_type': 'refresh_token'}        headers = self.headers        headers['Authorization'] = self.get_login_authorization()        response_data = requests.post(url, data=body, headers=headers)        return response_data.text    def business_unify_headers(self):        """        业务统一headers        在请求头中增加key为Authorization,value为"Bearer " + token        @return: headers        """        token = self.generate_token()        headers = self.headers        headers['Authorization'] = 'Bearer ' + token        return headers    # 业务api  注意 get与post 请求数据类型不同 post使用:application/json    def verify_device(self, **re_params):        """        验证设备        @param re_params:        @return:        """        url = self.appUrl + '/platform/api/device/verify-device'        return requests.get(url, params=re_params, headers=self.business_unify_headers())    def query_device_status(self, **re_params):        """        查询设备状态        @param re_params:        @return:        """        url = self.appUrl + '/platform/api/device/device-status'        return requests.get(url, params=re_params, headers=self.business_unify_headers())    def update_device_state(self, **re_data):        """        修改设备状态        @param re_data:        @return:        """        url = self.appUrl + '/platform/api/device/device-state'        headers = self.business_unify_headers()        headers['content-type'] = 'application/json'        return requests.post(url, data=json.dumps(re_data), headers=headers)    def query_device_usage_history(self, **re_params):        """        查询设备用量历史        @param re_params:        @return:        """        url = self.appUrl + '/platform/api/usage/device-usage-history'        return requests.get(url, params=re_params, headers=self.business_unify_headers())    def query_current_renew_list_usage_details(self, **re_params):        """        查询设备当前队列用量详情        @param re_params:        @return:        """        url = self.appUrl + '/platform/api/usage/current-renewlist-usage-details'        return requests.get(url, params=re_params, headers=self.business_unify_headers())    def get_device_batch_detail(self, **re_data):        """        查询设备当前队列用量详情        @return:        """        url = self.appUrl + '/platform/api/device/batch-detail'        headers = self.business_unify_headers()        headers['content-type'] = 'application/json'        return requests.post(url, data=json.dumps(re_data), headers=headers)    def query_package_list(self, **re_params):        """        查询套餐列表        @return:        """        url = self.appUrl + '/platform/api/package/list'        return requests.get(url, params=re_params, headers=self.business_unify_headers())    def query_renewal_list(self, **re_params):        """        续费套餐列表        @param re_params:        @return:        """        url = self.appUrl + '/platform/api/package/list'        return requests.get(url, params=re_params, headers=self.business_unify_headers())    def async_buy_package(self, **re_data):        """        查询设备当前队列用量详情        @return:        """        url = self.appUrl + '/platform/api/package/async-buy-package'        headers = self.business_unify_headers()        headers['content-type'] = 'application/json'        return requests.post(url, data=json.dumps(re_data), headers=headers)    @staticmethod    def get_text_dict(result):        """        响应结果转字典        @param result:        @return:        """        if result.status_code == 200:            return json.loads(result.text)        return None    @staticmethod    def unicom_flow_usage_cache(key, expire=0, **usage_data):        """        流量用量历史优先查询缓存        @param key: 缓存key        @param expire: 失效时间        @param usage_data: 查询参数        @return: 返回结果dict        """        redis = RedisObject()        usage_history = redis.get_data(key)        if usage_history:            usage_history = json.loads(usage_history)        else:            usage_history = UnicomObjeect().query_device_usage_history(**usage_data)            usage_history = UnicomObjeect().get_text_dict(usage_history)            redis.set_data(key=key, val=json.dumps(usage_history), expire=expire)        return usage_history    @staticmethod    def get_flow_usage_total(usage_year, usage_month, iccid):        """        查询当月用量历史        @param usage_year: 使用年        @param usage_month: 使用月        @param iccid: 联通id        @return: flow_total_usage 当月实际使用流量        """        flow_key = 'ASJ:UNICOM:FLOW:{}'        usage_data = {'iccid': iccid}        usage_history = UnicomObjeect().unicom_flow_usage_cache(flow_key.format(iccid), (60 * 10 + 3), **usage_data)        # 当月实际总使用流量        flow_total_usage = 0        if usage_history and usage_history['success']:            device_usage_history_list = usage_history['data']['deviceUsageHistory']            if device_usage_history_list:                for item in device_usage_history_list:                    if item['year'] == usage_year and item['month'] == usage_month:                        flow_total_usage = item['flowTotalUsage']                        break        return flow_total_usageif __name__ == '__main__':    today = datetime.datetime.today()    year = today.year    month = today.month    print(year)    print(month)    print(type(year))    if not '':        print('为空')    price = '12.13'    print(float(price))    discount = '6'    dd = Decimal(price) - Decimal(discount)    print(dd.quantize(Decimal('0.00')))    unicom_api = UnicomObjeect()    data = {'foo': 1, 'bar': 2, 'baz': 3}    print(unicom_api.createSign(**data))    # result = unicom_api.generate_token()    # result = unicom_api.refresh_token('5d0c0f30-99bd-4f17-9614-3524495b05d4')    params = {'iccid': '89860621330065433774', 'status': 2}    # response = unicom_api.verify_device(**params)    # response = unicom_api.query_device_status(**params)    response = unicom_api.update_device_state(**params)    # response = unicom_api.query_device_usage_history(**params)    # response = unicom_api.query_current_renew_list_usage_details(**params)    # unicom_api.get_device_batch_detail()    # response = unicom_api.query_package_list(**params)    # response = unicom_api.query_renewal_list(**params)    if response.status_code == 200:        res_dict = json.loads(response.text)        print(res_dict)    response_json = {        "success": True,        "msg": "操作成功",        "code": 0,        "data": {            "iccid": "8986062018007784202",            "completeIccid": "89860620180077842020",            "status": 1        }    }    print(response_json['data']['iccid'])    print(response.status_code)
 |