| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- # -*- encoding: utf-8 -*-
- """
- @File : QuecCloudService.py
- @Time : 2025/11/19 15:13
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- import json
- import time
- from decimal import Decimal
- from typing import Any, Optional
- from django.conf import settings
- from Ansjer.config import LOGGER
- from Model.models import UnicomDeviceInfo
- from Object.QuecCloudObject import QuecCloudApiClient
- from Object.RedisObject import RedisObject
- # 移远AK/SK
- QUEC_CLOUD_AK = settings.QUEC_CLOUD_AK
- QUEC_CLOUD_SK = settings.QUEC_CLOUD_SK
- QUEC_CLIENT = QuecCloudApiClient(
- app_key=QUEC_CLOUD_AK,
- secret=QUEC_CLOUD_SK
- )
- # 定义统一的 Key
- REDIS_FAIL_LOG_KEY = "asj:sim_card:op_failures"
- # 实例化你的 Redis 类 (确保配置正确)
- redis_client = RedisObject()
- MAX_LOG_LENGTH = 1000
- class QuecCloudService:
- @staticmethod
- def get_flow_total_usage(key: str, expire: int = 600, **usage_data: Any) -> int | Decimal:
- """
- 获取设备当前队列的流量用量详情(带缓存功能)
- 该函数用于查询指定设备的流量使用情况,并通过缓存机制减少重复查询,
- 提高系统性能。缓存时效由expire参数控制,** usage_data用于传递查询所需的额外参数。
- Args:
- key (str): 缓存的唯一标识键,通常与设备唯一ID相关联
- expire (int, optional): 缓存失效时间(秒),默认值为600秒(10分钟)
- **usage_data (Any): 可变关键字参数,用于传递额外查询参数,如iccid、imei等
- Returns:
- Optional[Decimal]: 流量用量详情,用量数据(如已用流量),
- 查询失败或无数据时返回数据库已历史流量记录
- Example:
- >>> QuecCloudService.get_flow_total_usage("device_123", 600, iccid="8986112321603574202")
- return "39.13"
- """
- redis = RedisObject()
- sim_flow_used_total = redis.get_data(key)
- if sim_flow_used_total:
- return Decimal(sim_flow_used_total).quantize(Decimal('0.00'))
- else:
- # 检查 iccid 是否存在
- if 'iccid' not in usage_data:
- LOGGER.error('缺少必要参数: iccid')
- return Decimal('0.00')
- # 查询SIM卡信息
- sim_qs = UnicomDeviceInfo.objects.filter(iccid=usage_data['iccid'])
- if not sim_qs:
- return Decimal('0.00')
- sim_vo = sim_qs.first()
- try:
- # 异步调用占位(需根据实际框架扩展)
- card_info = QUEC_CLIENT.query_single_card_info(iccid=usage_data['iccid'])
- cycle_total = card_info.get('usedflow')
- if cycle_total is None:
- LOGGER.error(f'移远查询流量异常: 未返回 usedflow 字段, iccid:{usage_data["iccid"]}')
- return sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
- except Exception as e:
- LOGGER.error(f'移远查询流量异常,iccid:{usage_data["iccid"]},error:{str(e)}')
- return sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
- cycle_total = Decimal(cycle_total).quantize(Decimal('0.00'))
- n_time = int(time.time())
- # 判断数据库周期流量用量 是否大于API查询出来的周期用量 如果是则判定进入了下一个周期
- if sim_vo.sim_cycle_used_flow != 0 and sim_vo.sim_cycle_used_flow > cycle_total:
- sim_used_flow = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
- sim_qs.update(
- sim_used_flow=sim_used_flow,
- sim_cycle_used_flow=cycle_total,
- updated_time=n_time
- )
- # 队列用量历史总量 + 上一个周期流量 + 当前周期流量 = 总消耗流量
- sim_flow_used_total = sim_used_flow + cycle_total
- elif cycle_total > sim_vo.sim_cycle_used_flow: # API周期用量大于当前数据库用量则更新记录
- sim_qs.update(sim_cycle_used_flow=cycle_total, updated_time=n_time)
- # 队列用量历史总量 + 当前周期流量 = 总消耗流量
- sim_flow_used_total = sim_vo.sim_used_flow + cycle_total
- else:
- sim_flow_used_total = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
- redis.CONN.setnx(key, str(sim_flow_used_total))
- redis.CONN.expire(key, expire)
- return sim_flow_used_total
- @staticmethod
- def resume_card(iccid: str) -> Optional[bool]:
- """
- 恢复 SIM 卡功能(激活已停用的 SIM 卡)
- """
- action_name = "激活"
- try:
- VALID_QUEC_STATUSES = {'正常', '测试期正常', '待激活'}
- quec_card_info = QUEC_CLIENT.query_single_card_info(iccid=iccid)
- if quec_card_info:
- quec_status = quec_card_info.get('status')
- gprsstatus = quec_card_info.get('gprsstatus')
- if quec_status not in VALID_QUEC_STATUSES:
- QUEC_CLIENT.resume_single_card(iccid=iccid)
- elif gprsstatus == '0':
- result = QUEC_CLIENT.open_gprs_service(iccid=iccid)
- LOGGER.info(f"{iccid}移远卡GPRS启动结果: {result}")
- # 如果 API 返回 False/None,视作失败进行记录
- if not result:
- QuecCloudService.record_sim_failure(iccid, action_name, "API返回操作失败或未响应")
- return True
- except Exception as e:
- error_msg = str(e)
- LOGGER.error(f"{iccid}移远卡恢复失败: {error_msg}")
- # === 【新增】记录到 Redis 供运营查看 ===
- QuecCloudService.record_sim_failure(iccid, action_name, error_msg)
- return False
- @staticmethod
- def suspend_single_card(iccid: str) -> Optional[bool]:
- """
- 停用 SIM 卡功能(停用 SIM 卡)
- """
- action_name = "停用"
- try:
- result = QUEC_CLIENT.close_gprs_service(iccid=iccid)
- LOGGER.info(f"{iccid}移远卡GPRS停机结果: {result}")
- # 如果 API 返回 False/None,视作失败进行记录
- if not result:
- QuecCloudService.record_sim_failure(iccid, action_name, "API返回操作失败或未响应")
- return result
- except Exception as e:
- error_msg = str(e)
- LOGGER.error(f"{iccid}移远卡停机失败: {error_msg}")
- # === 【新增】记录到 Redis 供运营查看 ===
- QuecCloudService.record_sim_failure(iccid, action_name, error_msg)
- return False
- @staticmethod
- def is_quec_cloud_sim(iccid: Optional[str] = None):
- """
- 单卡查询信息接口
- Args:
- iccid (Optional[str]): 集成电路卡识别码
- Returns:
- bool: 操作是否成功
- Raises:
- ValueError: 当msisdn和iccid都未提供时抛出
- """
- try:
- iccid = iccid[0:19]
- card_info = QUEC_CLIENT.query_single_card_info(iccid=iccid)
- LOGGER.info(f"{iccid}查询是否是QUEC云卡 状态:{card_info.get('status')},"
- f"已用流量:{card_info.get('usedflow')},套餐类型:{card_info.get('setmealType')}")
- return True
- except Exception as e:
- LOGGER.error(f"{iccid}单卡信息查询失败: {str(e)}")
- return False
- @staticmethod
- def query_quec_cloud_sim_info(iccid: Optional[str] = None) -> Optional[dict]:
- """
- 单卡查询信息接口
- Args:
- iccid (Optional[str]): 集成电路卡识别码
- Raises:
- ValueError: 当msisdn和iccid都未提供时抛出
- """
- try:
- iccid = iccid[0:19]
- return QUEC_CLIENT.query_single_card_info(iccid=iccid)
- except Exception as e:
- LOGGER.error(f"{iccid}单卡信息查询失败: {str(e)}")
- return None
- @staticmethod
- def record_sim_failure(iccid: str, action_type: str, error_msg: str):
- """
- 记录失败操作到 Redis List,并自动清理旧数据
- """
- try:
- # 1. 准备数据
- current_time = int(time.time())
- log_data = {
- "iccid": iccid,
- "action": action_type,
- "error": str(error_msg),
- "time": current_time,
- "status": "pending"
- }
- val = json.dumps(log_data, ensure_ascii=False)
- # 2. 写入 Redis (rpush: 从右边插入)
- # 注意:这里使用你 RedisObject 实例化的对象,假设变量名叫 redis_client
- redis_client.rpush(REDIS_FAIL_LOG_KEY, val)
- # ==========================================
- # 3. 防爆逻辑 (放在这里)
- # ==========================================
- # 获取当前长度
- current_len = redis_client.llen(REDIS_FAIL_LOG_KEY)
- # 如果超过最大长度,从左边弹出最老的数据 (lpop)
- if current_len and current_len > MAX_LOG_LENGTH:
- # 循环弹出,直到长度符合要求(防止并发写入时瞬间超出很多)
- # 这里简单点只弹出一个也可以,通常够用了
- redis_client.lpop(REDIS_FAIL_LOG_KEY)
- except Exception as e:
- # 记录日志本身如果报错,打印一下,不要影响主业务
- LOGGER.error(f"写入Redis失败日志出错: {str(e)}")
- @staticmethod
- def get_failure_logs(page=1, page_size=20):
- """
- 获取失败记录列表(支持简单分页)
- Redis List 下标从 0 开始
- """
- start = (page - 1) * page_size
- end = start + page_size - 1
- # 使用 RedisObject 的 lrange 方法
- # 注意:lrange 获取的是 字节(bytes) 或 字符串,需要反序列化
- raw_list = redis_client.lrange(REDIS_FAIL_LOG_KEY, start, end)
- formatted_list = []
- if raw_list:
- for item in raw_list:
- try:
- # 如果 item 是 bytes,需要 decode,如果是 str 则不需要
- if isinstance(item, bytes):
- item = item.decode('utf-8')
- formatted_list.append(json.loads(item))
- except Exception as e:
- LOGGER.error(f"反序列化失败日志出错: {str(e)}")
- continue
- return formatted_list
|