Browse Source

联通API注释验签代码,新增获取当前SIM卡流量用量历史加入缓存

zhangdongming 2 years ago
parent
commit
0bf6a799b5
2 changed files with 21 additions and 17 deletions
  1. 1 1
      Controller/UnicomCombo/UnicomComboController.py
  2. 20 16
      Object/UnicomObject.py

+ 1 - 1
Controller/UnicomCombo/UnicomComboController.py

@@ -679,7 +679,7 @@ class UnicomComboView(View):
                 generate_sign = unicom_obj.createSign(**dict_data)
                 logger.info('联通设备状态变更推送请求签名{}'.format(sign))
                 logger.info('联通设备状态变更推送生成签名{}'.format(generate_sign))
-                assert generate_sign == sign
+                # assert generate_sign == sign
                 now_time = int(time.time())
                 re_data = {
                     'iccid': dict_data['iccid'],

+ 20 - 16
Object/UnicomObject.py

@@ -308,21 +308,25 @@ class UnicomObjeect:
             return True
         return None
 
+    @staticmethod
+    def current_sim_traffic_usage_details(icc_id):
+        """
+        当前sim卡流量使用详情
+        @param icc_id:20位数字iccid
+        @return: traffic
+        """
+        redis = RedisObject()
+        traffic_key = 'ASJ:UNICOM:FLOW:{}'.format(icc_id)
+        traffic_sys = 'ASJ:SIM:TRAFFIC:{}'.format(icc_id)
+        traffic_val = redis.get_data(traffic_key)
+        if traffic_val:
+            traffic_dict = json.loads(traffic_val)
+            redis.set_data(key=traffic_sys, val=traffic_val, expire=60 * 60 * 24)
+        else:
+            traffic_val = redis.get_data(traffic_sys)
+            if not traffic_val:
+                return 0
+            traffic_dict = json.loads(traffic_val)
+        return traffic_dict['data']['flowTotalUsage']
 
-if __name__ == '__main__':
-    unicom_api = UnicomObjeect()
-    # result = unicom_api.generate_token()
-    # result = unicom_api.refresh_token('5d0c0f30-99bd-4f17-9614-3524495b05d4')
-    params = {'iccid': '89860620170009631443', 'status': 1}
-    # response = unicom_api.verify_device(**params)
-    # response = unicom_api.query_device_status(**params)
-    response = unicom_api.update_device_state(**params)
-    # response = unicom_api.query_device_usage_history(**params)
-    # response = unicom_api.query_current_renew_list_usage_details(**params)
-    # unicom_api.get_device_batch_detail()
-    # response = unicom_api.query_package_list(**params)
-    # response = unicom_api.query_renewal_list(**params)
 
-    if response.status_code == 200:
-        res = json.loads(response.text)
-        print(res)