123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- # -*- encoding: utf-8 -*-
- """
- @File : TelecomService.py
- @Time : 2024/1/24 15:12
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- import json
- import re
- import time
- from decimal import Decimal
- from Ansjer.config import LOGGER
- from Model.models import UnicomDeviceInfo, LogModel, AccessNumberTaskQueue
- from Object.RedisObject import RedisObject
- from Object.TelecomObject import TelecomObject
- class TelecomService:
- @classmethod
- def query_total_usage_by_access_number(cls, iccid, key, expire=600):
- """
- 根据接入号码查询设备总流量使用量(实现缓存)
- @param iccid: 20位ICCID
- @param key: 缓存key
- @param expire: 失效时间
- @return: 查询流量结果
- """
- redis = RedisObject()
- sim_flow_used_total = redis.get_data(key)
- if sim_flow_used_total:
- return Decimal(sim_flow_used_total).quantize(Decimal('0.00'))
- else:
- # 查询SIM卡信息
- sim_qs = UnicomDeviceInfo.objects.filter(iccid=iccid)
- if not sim_qs:
- return None
- sim_vo = sim_qs.first()
- telecom = TelecomObject()
- # 查询电信当月流量用量
- data = telecom.query_total_usage_by_date(sim_vo.access_number)
- if data and data['SvcCont']['resultCode'] == '0' and 'dataCumulationTotal' in data['SvcCont']['result']:
- cycle_total = data['SvcCont']['result'].get('dataCumulationTotal')
- else:
- LOGGER.info(f'query_total_usage_by_access_number查询流量异常,iccid:{iccid}')
- return sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
- cycle_total = Decimal(cycle_total).quantize(Decimal('0.00'))
- n_time = int(time.time())
- # 判断数据库周期流量用量 是否大于API查询出来的周期用量 如果是则判定进入了下一个周期
- if sim_vo.sim_cycle_used_flow != 0 and sim_vo.sim_cycle_used_flow > cycle_total:
- sim_used_flow = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
- sim_qs.update(
- sim_used_flow=sim_used_flow,
- sim_cycle_used_flow=cycle_total,
- updated_time=n_time
- )
- # 队列用量历史总量 + 上一个周期流量 + 当前周期流量 = 总消耗流量
- sim_flow_used_total = sim_used_flow + cycle_total
- elif cycle_total > sim_vo.sim_cycle_used_flow: # API周期用量大于当前数据库用量则更新记录
- sim_qs.update(sim_cycle_used_flow=cycle_total, updated_time=n_time)
- # 队列用量历史总量 + 当前周期流量 = 总消耗流量
- sim_flow_used_total = sim_vo.sim_used_flow + cycle_total
- else:
- sim_flow_used_total = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
- # 缓存当月流量数据
- redis.CONN.setnx(key, str(sim_flow_used_total))
- redis.CONN.expire(key, expire)
- return sim_flow_used_total
- @classmethod
- def update_access_number_network(cls, iccid, access_number, action_value, reason=''):
- """
- 根据接入号码修改卡网络状态
- @param reason:
- @param iccid:
- @param access_number: 11位接入号码
- @param action_value: ADD:单独添加断网,DEL:单独恢复上网
- @return: 修改后结果 None | success
- """
- try:
- # 获取当前卡号是否有过变更网络状态缓存记录
- redis = RedisObject()
- key = f'ASJ:TELECOM:CHANGE:{iccid}'
- change_data = redis.get_data(key)
- action = 1 if action_value == 'ADD' else 2
- now_time = int(time.time())
- # 变更网络存表数据
- data = {'iccid': iccid, 'access_number': access_number, 'type': 2, 'action': action, 'previous_status': 0,
- 'new_status': 0, 'count': 0, 'reason': reason,
- 'status': 0, 'created_time': now_time, 'updated_time': now_time}
- if change_data:
- c_data = json.loads(change_data)
- if c_data['actionValue'] == action_value: # 判断是否重复操作
- return None
- AccessNumberTaskQueue.objects.create(**data)
- LOGGER.info(f'{iccid}未执行变更网络,数据已加入任务队列记录')
- return None
- # 获取当前卡号网络状态 接口有调用限制并发 15000次/秒 所以尝试多次调用
- now_network_status = 0
- i = 0
- while i < 3:
- network_result = cls.get_access_number_network_status(access_number)
- if network_result:
- now_network_status = 2 if network_result == 'connect' else 1
- break
- i += 1
- if 0 < now_network_status == action: # 当前状态与要变更的状态相同则不调用变更API
- ip = '127.0.0.1'
- params = {'action': action_value, 'result': 'No need for changes', 'access_number': access_number}
- explain = f'{iccid}单独断网或恢复网络'
- cls.create_operation_log('updateNetworkStatus', ip, params, explain)
- return None
- result = TelecomObject().single_cut_net(access_number, action_value)
- # 更新变更状态
- data['previous_status'] = now_network_status
- data['new_status'] = action
- data['count'] = 1
- cache_data = {'iccid': iccid, 'actionValue': action_value}
- if not result: # 变更电信卡网络状态 失败处理
- data['status'] = 2
- data['result'] = {'code': '101007', 'message': '该接入号码高频次受理此业务,调用暂时受限,请稍后再试'}
- AccessNumberTaskQueue.objects.create(**data) # 失败后保存数据记录,走重试机制
- redis.CONN.setnx(key, json.dumps(cache_data)) # 缓存当前卡号130秒不再调用变更网络状态接口,防止调用限制
- redis.CONN.expire(key, 130)
- return None
- data['status'] = 1
- data['completion_time'] = now_time
- if result == '-5': # 已符合变更后状态
- data['result'] = {'-5': '该号码未订购单独断网功能!;流水号:1000000190202402201640359500'}
- else:
- data['result'] = result
- AccessNumberTaskQueue.objects.create(**data)
- redis.CONN.setnx(key, json.dumps(cache_data))
- redis.CONN.expire(key, 130)
- return 'success'
- except Exception as e:
- LOGGER.info('***{} TelecomService.update_access_number_network:errLine:{}, errMsg:{}'
- .format(access_number, e.__traceback__.tb_lineno, repr(e)))
- return None
- @classmethod
- def create_operation_log(cls, url, ip, request_dict, describe):
- """
- 创建操作日志
- @param url: 请求路径
- @param describe: 描述
- @param ip: 当前IP
- @param request_dict: 请求参数
- @return: True | False
- """
- try:
- # 记录操作日志
- content = json.loads(json.dumps(request_dict))
- log = {
- 'ip': ip,
- 'user_id': 1,
- 'status': 200,
- 'time': int(time.time()),
- 'content': json.dumps(content),
- 'url': url,
- 'operation': describe,
- }
- LogModel.objects.create(**log)
- return True
- except Exception as e:
- print('日志异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return False
- @classmethod
- def get_access_number_network_status(cls, access_number):
- """
- 根据access_number查询是否单独断网
- @param access_number: 11位接入号码
- @return: 当前网络数据
- """
- try:
- result = TelecomObject().query_product_information(access_number)
- if not result:
- return None
- if result and result['SvcCont']['resultCode'] == '0':
- # 获取产品信息
- prod_infos = result["SvcCont"]["result"]["prodInfos"]
- # 获取 funProdInfos 列表
- fun_prod_infos = prod_infos["funProdInfos"]
- # 遍历 funProdInfos 列表
- for prod_info in fun_prod_infos:
- # 获取是否已单独断网信息
- if isinstance(prod_info, dict) and "productName" in prod_info:
- is_disconnect = prod_info.get("productName")
- if "是否已单独断网" in is_disconnect:
- is_disconnect_value = re.search(r"(?<=:).+", is_disconnect).group(0)
- # 进行判断
- if is_disconnect_value == "否":
- return 'connect'
- else:
- return 'disconnect'
- return None
- except Exception as e:
- print('***{}get_access_number_network_status,errLine:{}, errMsg:{}'
- .format(access_number, e.__traceback__.tb_lineno, repr(e)))
- return None
|