| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 | # -*- encoding: utf-8 -*-"""@File    : TelecomService.py@Time    : 2024/1/24 15:12@Author  : stephen@Email   : zhangdongming@asj6.wecom.work@Software: PyCharm"""import jsonimport reimport timefrom decimal import Decimalfrom Ansjer.config import LOGGERfrom Model.models import UnicomDeviceInfo, LogModel, AccessNumberTaskQueuefrom Object.RedisObject import RedisObjectfrom Object.TelecomObject import TelecomObjectclass TelecomService:    @classmethod    def query_total_usage_by_access_number(cls, iccid, key, expire=600):        """        根据接入号码查询设备总流量使用量(实现缓存)        @param iccid: 20位ICCID        @param key: 缓存key        @param expire: 失效时间        @return: 查询流量结果        """        redis = RedisObject()        sim_flow_used_total = redis.get_data(key)        if sim_flow_used_total:            return Decimal(sim_flow_used_total).quantize(Decimal('0.00'))        else:            # 查询SIM卡信息            sim_qs = UnicomDeviceInfo.objects.filter(iccid=iccid)            if not sim_qs:                return None            sim_vo = sim_qs.first()            telecom = TelecomObject()            # 查询电信当月流量用量            data = telecom.query_total_usage_by_date(sim_vo.access_number)            if data and data['SvcCont']['resultCode'] == '0' and 'dataCumulationTotal' in data['SvcCont']['result']:                cycle_total = data['SvcCont']['result'].get('dataCumulationTotal')            else:                LOGGER.info(f'query_total_usage_by_access_number查询流量异常,iccid:{iccid}')                return sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow            cycle_total = Decimal(cycle_total).quantize(Decimal('0.00'))            n_time = int(time.time())            # 判断数据库周期流量用量 是否大于API查询出来的周期用量 如果是则判定进入了下一个周期            if sim_vo.sim_cycle_used_flow != 0 and sim_vo.sim_cycle_used_flow > cycle_total:                sim_used_flow = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow                sim_qs.update(                    sim_used_flow=sim_used_flow,                    sim_cycle_used_flow=cycle_total,                    updated_time=n_time                )                # 队列用量历史总量 + 上一个周期流量 + 当前周期流量 = 总消耗流量                sim_flow_used_total = sim_used_flow + cycle_total            elif cycle_total > sim_vo.sim_cycle_used_flow:  # API周期用量大于当前数据库用量则更新记录                sim_qs.update(sim_cycle_used_flow=cycle_total, updated_time=n_time)                # 队列用量历史总量 + 当前周期流量 = 总消耗流量                sim_flow_used_total = sim_vo.sim_used_flow + cycle_total            else:                sim_flow_used_total = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow            # 缓存当月流量数据            redis.CONN.setnx(key, str(sim_flow_used_total))            redis.CONN.expire(key, expire)        return sim_flow_used_total    @classmethod    def update_access_number_network(cls, iccid, access_number, action_value, reason=''):        """        根据接入号码修改卡网络状态        @param reason:        @param iccid:        @param access_number: 11位接入号码        @param action_value: ADD:单独添加断网,DEL:单独恢复上网        @return: 修改后结果 None | success        """        try:            # 获取当前卡号是否有过变更网络状态缓存记录            redis = RedisObject()            key = f'ASJ:TELECOM:CHANGE:{iccid}'            change_data = redis.get_data(key)            action = 1 if action_value == 'ADD' else 2            now_time = int(time.time())            # 变更网络存表数据            data = {'iccid': iccid, 'access_number': access_number, 'type': 2, 'action': action, 'previous_status': 0,                    'new_status': 0, 'count': 0, 'reason': reason,                    'status': 0, 'created_time': now_time, 'updated_time': now_time}            if change_data:                c_data = json.loads(change_data)                if c_data['actionValue'] == action_value:  # 判断是否重复操作                    return None                AccessNumberTaskQueue.objects.create(**data)                LOGGER.info(f'{iccid}未执行变更网络,数据已加入任务队列记录')                return None            # 获取当前卡号网络状态 接口有调用限制并发 15000次/秒 所以尝试多次调用            now_network_status = 0            i = 0            while i < 3:                network_result = cls.get_access_number_network_status(access_number)                if network_result:                    now_network_status = 2 if network_result == 'connect' else 1                    break                i += 1            if 0 < now_network_status == action:  # 当前状态与要变更的状态相同则不调用变更API                ip = '127.0.0.1'                params = {'action': action_value, 'result': 'No need for changes', 'access_number': access_number}                explain = f'{iccid}单独断网或恢复网络'                cls.create_operation_log('updateNetworkStatus', ip, params, explain)                return None            result = TelecomObject().single_cut_net(access_number, action_value)            # 更新变更状态            data['previous_status'] = now_network_status            data['new_status'] = action            data['count'] = 1            cache_data = {'iccid': iccid, 'actionValue': action_value}            if not result:  # 变更电信卡网络状态 失败处理                data['status'] = 2                data['result'] = {'code': '101007', 'message': '该接入号码高频次受理此业务,调用暂时受限,请稍后再试'}                AccessNumberTaskQueue.objects.create(**data)  # 失败后保存数据记录,走重试机制                redis.CONN.setnx(key, json.dumps(cache_data))  # 缓存当前卡号130秒不再调用变更网络状态接口,防止调用限制                redis.CONN.expire(key, 130)                return None            data['status'] = 1            data['completion_time'] = now_time            if result == '-5':  # 已符合变更后状态                data['result'] = {'-5': '该号码未订购单独断网功能!;流水号:1000000190202402201640359500'}            else:                data['result'] = result            AccessNumberTaskQueue.objects.create(**data)            redis.CONN.setnx(key, json.dumps(cache_data))            redis.CONN.expire(key, 130)            return 'success'        except Exception as e:            LOGGER.info('***{} TelecomService.update_access_number_network:errLine:{}, errMsg:{}'                        .format(access_number, e.__traceback__.tb_lineno, repr(e)))            return None    @classmethod    def create_operation_log(cls, url, ip, request_dict, describe):        """        创建操作日志        @param url: 请求路径        @param describe: 描述        @param ip: 当前IP        @param request_dict: 请求参数        @return: True | False        """        try:            # 记录操作日志            content = json.loads(json.dumps(request_dict))            log = {                'ip': ip,                'user_id': 1,                'status': 200,                'time': int(time.time()),                'content': json.dumps(content),                'url': url,                'operation': describe,            }            LogModel.objects.create(**log)            return True        except Exception as e:            print('日志异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))            return False    @classmethod    def get_access_number_network_status(cls, access_number):        """        根据access_number查询是否单独断网        @param access_number: 11位接入号码        @return: 当前网络数据        """        try:            result = TelecomObject().query_product_information(access_number)            if not result:                return None            if result and result['SvcCont']['resultCode'] == '0':                # 获取产品信息                prod_infos = result["SvcCont"]["result"]["prodInfos"]                # 获取 funProdInfos 列表                fun_prod_infos = prod_infos["funProdInfos"]                # 遍历 funProdInfos 列表                for prod_info in fun_prod_infos:                    # 获取是否已单独断网信息                    if isinstance(prod_info, dict) and "productName" in prod_info:                        is_disconnect = prod_info.get("productName")                        if "是否已单独断网" in is_disconnect:                            is_disconnect_value = re.search(r"(?<=:).+", is_disconnect).group(0)                            # 进行判断                            if is_disconnect_value == "否":                                return 'connect'                            else:                                return 'disconnect'            return None        except Exception as e:            print('***{}get_access_number_network_status,errLine:{}, errMsg:{}'                  .format(access_number, e.__traceback__.tb_lineno, repr(e)))            return None
 |