# -*- encoding: utf-8 -*- """ @File : TelecomService.py @Time : 2024/1/24 15:12 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import json import re import time from decimal import Decimal from Ansjer.config import LOGGER from Model.models import UnicomDeviceInfo, LogModel, AccessNumberTaskQueue from Object.RedisObject import RedisObject from Object.TelecomObject import TelecomObject class TelecomService: @classmethod def query_total_usage_by_access_number(cls, iccid, key, expire=600): """ 根据接入号码查询设备总流量使用量(实现缓存) @param iccid: 20位ICCID @param key: 缓存key @param expire: 失效时间 @return: 查询流量结果 """ redis = RedisObject() sim_flow_used_total = redis.get_data(key) if sim_flow_used_total: return Decimal(sim_flow_used_total).quantize(Decimal('0.00')) else: # 查询SIM卡信息 sim_qs = UnicomDeviceInfo.objects.filter(iccid=iccid) if not sim_qs: return None sim_vo = sim_qs.first() telecom = TelecomObject() # 查询电信当月流量用量 data = telecom.query_total_usage_by_date(sim_vo.access_number) if data and data['SvcCont']['resultCode'] == '0' and 'dataCumulationTotal' in data['SvcCont']['result']: cycle_total = data['SvcCont']['result'].get('dataCumulationTotal') else: LOGGER.info(f'query_total_usage_by_access_number查询流量异常,iccid:{iccid}') return sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow cycle_total = Decimal(cycle_total).quantize(Decimal('0.00')) n_time = int(time.time()) # 判断数据库周期流量用量 是否大于API查询出来的周期用量 如果是则判定进入了下一个周期 if sim_vo.sim_cycle_used_flow != 0 and sim_vo.sim_cycle_used_flow > cycle_total: sim_used_flow = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow sim_qs.update( sim_used_flow=sim_used_flow, sim_cycle_used_flow=cycle_total, updated_time=n_time ) # 队列用量历史总量 + 上一个周期流量 + 当前周期流量 = 总消耗流量 sim_flow_used_total = sim_used_flow + cycle_total elif cycle_total > sim_vo.sim_cycle_used_flow: # API周期用量大于当前数据库用量则更新记录 sim_qs.update(sim_cycle_used_flow=cycle_total, updated_time=n_time) # 队列用量历史总量 + 当前周期流量 = 总消耗流量 sim_flow_used_total = sim_vo.sim_used_flow + cycle_total else: sim_flow_used_total = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow # 缓存当月流量数据 redis.CONN.setnx(key, str(sim_flow_used_total)) redis.CONN.expire(key, expire) return sim_flow_used_total @classmethod def update_access_number_network(cls, iccid, access_number, action_value, reason=''): """ 根据接入号码修改卡网络状态 @param reason: @param iccid: @param access_number: 11位接入号码 @param action_value: ADD:单独添加断网,DEL:单独恢复上网 @return: 修改后结果 None | success """ try: # 获取当前卡号是否有过变更网络状态缓存记录 redis = RedisObject() key = f'ASJ:TELECOM:CHANGE:{iccid}' change_data = redis.get_data(key) action = 1 if action_value == 'ADD' else 2 now_time = int(time.time()) # 变更网络存表数据 data = {'iccid': iccid, 'access_number': access_number, 'type': 2, 'action': action, 'previous_status': 0, 'new_status': 0, 'count': 0, 'reason': reason, 'status': 0, 'created_time': now_time, 'updated_time': now_time} if change_data: c_data = json.loads(change_data) if c_data['actionValue'] == action_value: # 判断是否重复操作 return None AccessNumberTaskQueue.objects.create(**data) LOGGER.info(f'{iccid}未执行变更网络,数据已加入任务队列记录') return None # 获取当前卡号网络状态 接口有调用限制并发 15000次/秒 所以尝试多次调用 now_network_status = 0 i = 0 while i < 3: network_result = cls.get_access_number_network_status(access_number) if network_result: now_network_status = 2 if network_result == 'connect' else 1 break i += 1 if 0 < now_network_status == action: # 当前状态与要变更的状态相同则不调用变更API ip = '127.0.0.1' params = {'action': action_value, 'result': 'No need for changes', 'access_number': access_number} explain = f'{iccid}单独断网或恢复网络' cls.create_operation_log('updateNetworkStatus', ip, params, explain) return None result = TelecomObject().single_cut_net(access_number, action_value) # 更新变更状态 data['previous_status'] = now_network_status data['new_status'] = action data['count'] = 1 cache_data = {'iccid': iccid, 'actionValue': action_value} if not result: # 变更电信卡网络状态 失败处理 data['status'] = 2 data['result'] = {'code': '101007', 'message': '该接入号码高频次受理此业务,调用暂时受限,请稍后再试'} AccessNumberTaskQueue.objects.create(**data) # 失败后保存数据记录,走重试机制 redis.CONN.setnx(key, json.dumps(cache_data)) # 缓存当前卡号130秒不再调用变更网络状态接口,防止调用限制 redis.CONN.expire(key, 130) return None data['status'] = 1 data['completion_time'] = now_time if result == '-5': # 已符合变更后状态 data['result'] = {'-5': '该号码未订购单独断网功能!;流水号:1000000190202402201640359500'} else: data['result'] = result AccessNumberTaskQueue.objects.create(**data) redis.CONN.setnx(key, json.dumps(cache_data)) redis.CONN.expire(key, 130) return 'success' except Exception as e: LOGGER.info('***{} TelecomService.update_access_number_network:errLine:{}, errMsg:{}' .format(access_number, e.__traceback__.tb_lineno, repr(e))) return None @classmethod def create_operation_log(cls, url, ip, request_dict, describe): """ 创建操作日志 @param url: 请求路径 @param describe: 描述 @param ip: 当前IP @param request_dict: 请求参数 @return: True | False """ try: # 记录操作日志 content = json.loads(json.dumps(request_dict)) log = { 'ip': ip, 'user_id': 1, 'status': 200, 'time': int(time.time()), 'content': json.dumps(content), 'url': url, 'operation': describe, } LogModel.objects.create(**log) return True except Exception as e: print('日志异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False @classmethod def get_access_number_network_status(cls, access_number): """ 根据access_number查询是否单独断网 @param access_number: 11位接入号码 @return: 当前网络数据 """ try: result = TelecomObject().query_product_information(access_number) if not result: return None if result and result['SvcCont']['resultCode'] == '0': # 获取产品信息 prod_infos = result["SvcCont"]["result"]["prodInfos"] # 获取 funProdInfos 列表 fun_prod_infos = prod_infos["funProdInfos"] # 遍历 funProdInfos 列表 for prod_info in fun_prod_infos: # 获取是否已单独断网信息 if isinstance(prod_info, dict) and "productName" in prod_info: is_disconnect = prod_info.get("productName") if "是否已单独断网" in is_disconnect: is_disconnect_value = re.search(r"(?<=:).+", is_disconnect).group(0) # 进行判断 if is_disconnect_value == "否": return 'connect' else: return 'disconnect' return None except Exception as e: print('***{}get_access_number_network_status,errLine:{}, errMsg:{}' .format(access_number, e.__traceback__.tb_lineno, repr(e))) return None