QuecCloudService.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : QuecCloudService.py
  4. @Time : 2025/11/19 15:13
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import json
  10. import time
  11. from decimal import Decimal
  12. from typing import Any, Optional
  13. from django.conf import settings
  14. from Ansjer.config import LOGGER
  15. from Model.models import UnicomDeviceInfo
  16. from Object.QuecCloudObject import QuecCloudApiClient
  17. from Object.RedisObject import RedisObject
  18. # 移远AK/SK
  19. QUEC_CLOUD_AK = settings.QUEC_CLOUD_AK
  20. QUEC_CLOUD_SK = settings.QUEC_CLOUD_SK
  21. QUEC_CLIENT = QuecCloudApiClient(
  22. app_key=QUEC_CLOUD_AK,
  23. secret=QUEC_CLOUD_SK
  24. )
  25. # 定义统一的 Key
  26. REDIS_FAIL_LOG_KEY = "asj:sim_card:op_failures"
  27. # 实例化你的 Redis 类 (确保配置正确)
  28. redis_client = RedisObject()
  29. MAX_LOG_LENGTH = 1000
  30. class QuecCloudService:
  31. @staticmethod
  32. def get_flow_total_usage(key: str, expire: int = 600, **usage_data: Any) -> int | Decimal:
  33. """
  34. 获取设备当前队列的流量用量详情(带缓存功能)
  35. 该函数用于查询指定设备的流量使用情况,并通过缓存机制减少重复查询,
  36. 提高系统性能。缓存时效由expire参数控制,** usage_data用于传递查询所需的额外参数。
  37. Args:
  38. key (str): 缓存的唯一标识键,通常与设备唯一ID相关联
  39. expire (int, optional): 缓存失效时间(秒),默认值为600秒(10分钟)
  40. **usage_data (Any): 可变关键字参数,用于传递额外查询参数,如iccid、imei等
  41. Returns:
  42. Optional[Decimal]: 流量用量详情,用量数据(如已用流量),
  43. 查询失败或无数据时返回数据库已历史流量记录
  44. Example:
  45. >>> QuecCloudService.get_flow_total_usage("device_123", 600, iccid="8986112321603574202")
  46. return "39.13"
  47. """
  48. redis = RedisObject()
  49. sim_flow_used_total = redis.get_data(key)
  50. if sim_flow_used_total:
  51. return Decimal(sim_flow_used_total).quantize(Decimal('0.00'))
  52. else:
  53. # 检查 iccid 是否存在
  54. if 'iccid' not in usage_data:
  55. LOGGER.error('缺少必要参数: iccid')
  56. return Decimal('0.00')
  57. # 查询SIM卡信息
  58. sim_qs = UnicomDeviceInfo.objects.filter(iccid=usage_data['iccid'])
  59. if not sim_qs:
  60. return Decimal('0.00')
  61. sim_vo = sim_qs.first()
  62. try:
  63. # 异步调用占位(需根据实际框架扩展)
  64. card_info = QUEC_CLIENT.query_single_card_info(iccid=usage_data['iccid'])
  65. cycle_total = card_info.get('usedflow')
  66. if cycle_total is None:
  67. LOGGER.error(f'移远查询流量异常: 未返回 usedflow 字段, iccid:{usage_data["iccid"]}')
  68. return sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
  69. except Exception as e:
  70. LOGGER.error(f'移远查询流量异常,iccid:{usage_data["iccid"]},error:{str(e)}')
  71. return sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
  72. cycle_total = Decimal(cycle_total).quantize(Decimal('0.00'))
  73. n_time = int(time.time())
  74. # 判断数据库周期流量用量 是否大于API查询出来的周期用量 如果是则判定进入了下一个周期
  75. if sim_vo.sim_cycle_used_flow != 0 and sim_vo.sim_cycle_used_flow > cycle_total:
  76. sim_used_flow = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
  77. sim_qs.update(
  78. sim_used_flow=sim_used_flow,
  79. sim_cycle_used_flow=cycle_total,
  80. updated_time=n_time
  81. )
  82. # 队列用量历史总量 + 上一个周期流量 + 当前周期流量 = 总消耗流量
  83. sim_flow_used_total = sim_used_flow + cycle_total
  84. elif cycle_total > sim_vo.sim_cycle_used_flow: # API周期用量大于当前数据库用量则更新记录
  85. sim_qs.update(sim_cycle_used_flow=cycle_total, updated_time=n_time)
  86. # 队列用量历史总量 + 当前周期流量 = 总消耗流量
  87. sim_flow_used_total = sim_vo.sim_used_flow + cycle_total
  88. else:
  89. sim_flow_used_total = sim_vo.sim_used_flow + sim_vo.sim_cycle_used_flow
  90. redis.CONN.setnx(key, str(sim_flow_used_total))
  91. redis.CONN.expire(key, expire)
  92. return sim_flow_used_total
  93. @staticmethod
  94. def resume_card(iccid: str) -> Optional[bool]:
  95. """
  96. 恢复 SIM 卡功能(激活已停用的 SIM 卡)
  97. """
  98. action_name = "激活"
  99. try:
  100. VALID_QUEC_STATUSES = {'正常', '测试期正常', '待激活'}
  101. quec_card_info = QUEC_CLIENT.query_single_card_info(iccid=iccid)
  102. if quec_card_info:
  103. quec_status = quec_card_info.get('status')
  104. gprsstatus = quec_card_info.get('gprsstatus')
  105. if quec_status not in VALID_QUEC_STATUSES:
  106. QUEC_CLIENT.resume_single_card(iccid=iccid)
  107. elif gprsstatus == '0':
  108. result = QUEC_CLIENT.open_gprs_service(iccid=iccid)
  109. LOGGER.info(f"{iccid}移远卡GPRS启动结果: {result}")
  110. # 如果 API 返回 False/None,视作失败进行记录
  111. if not result:
  112. QuecCloudService.record_sim_failure(iccid, action_name, "API返回操作失败或未响应")
  113. return True
  114. except Exception as e:
  115. error_msg = str(e)
  116. LOGGER.error(f"{iccid}移远卡恢复失败: {error_msg}")
  117. # === 【新增】记录到 Redis 供运营查看 ===
  118. QuecCloudService.record_sim_failure(iccid, action_name, error_msg)
  119. return False
  120. @staticmethod
  121. def suspend_single_card(iccid: str) -> Optional[bool]:
  122. """
  123. 停用 SIM 卡功能(停用 SIM 卡)
  124. """
  125. action_name = "停用"
  126. try:
  127. result = QUEC_CLIENT.close_gprs_service(iccid=iccid)
  128. LOGGER.info(f"{iccid}移远卡GPRS停机结果: {result}")
  129. # 如果 API 返回 False/None,视作失败进行记录
  130. if not result:
  131. QuecCloudService.record_sim_failure(iccid, action_name, "API返回操作失败或未响应")
  132. return result
  133. except Exception as e:
  134. error_msg = str(e)
  135. LOGGER.error(f"{iccid}移远卡停机失败: {error_msg}")
  136. # === 【新增】记录到 Redis 供运营查看 ===
  137. QuecCloudService.record_sim_failure(iccid, action_name, error_msg)
  138. return False
  139. @staticmethod
  140. def is_quec_cloud_sim(iccid: Optional[str] = None):
  141. """
  142. 单卡查询信息接口
  143. Args:
  144. iccid (Optional[str]): 集成电路卡识别码
  145. Returns:
  146. bool: 操作是否成功
  147. Raises:
  148. ValueError: 当msisdn和iccid都未提供时抛出
  149. """
  150. try:
  151. iccid = iccid[0:19]
  152. card_info = QUEC_CLIENT.query_single_card_info(iccid=iccid)
  153. LOGGER.info(f"{iccid}查询是否是QUEC云卡 状态:{card_info.get('status')},"
  154. f"已用流量:{card_info.get('usedflow')},套餐类型:{card_info.get('setmealType')}")
  155. return True
  156. except Exception as e:
  157. LOGGER.error(f"{iccid}单卡信息查询失败: {str(e)}")
  158. return False
  159. @staticmethod
  160. def query_quec_cloud_sim_info(iccid: Optional[str] = None) -> Optional[dict]:
  161. """
  162. 单卡查询信息接口
  163. Args:
  164. iccid (Optional[str]): 集成电路卡识别码
  165. Raises:
  166. ValueError: 当msisdn和iccid都未提供时抛出
  167. """
  168. try:
  169. iccid = iccid[0:19]
  170. return QUEC_CLIENT.query_single_card_info(iccid=iccid)
  171. except Exception as e:
  172. LOGGER.error(f"{iccid}单卡信息查询失败: {str(e)}")
  173. return None
  174. @staticmethod
  175. def record_sim_failure(iccid: str, action_type: str, error_msg: str):
  176. """
  177. 记录失败操作到 Redis List,并自动清理旧数据
  178. """
  179. try:
  180. # 1. 准备数据
  181. current_time = int(time.time())
  182. log_data = {
  183. "iccid": iccid,
  184. "action": action_type,
  185. "error": str(error_msg),
  186. "time": current_time,
  187. "status": "pending"
  188. }
  189. val = json.dumps(log_data, ensure_ascii=False)
  190. # 2. 写入 Redis (rpush: 从右边插入)
  191. # 注意:这里使用你 RedisObject 实例化的对象,假设变量名叫 redis_client
  192. redis_client.rpush(REDIS_FAIL_LOG_KEY, val)
  193. # ==========================================
  194. # 3. 防爆逻辑 (放在这里)
  195. # ==========================================
  196. # 获取当前长度
  197. current_len = redis_client.llen(REDIS_FAIL_LOG_KEY)
  198. # 如果超过最大长度,从左边弹出最老的数据 (lpop)
  199. if current_len and current_len > MAX_LOG_LENGTH:
  200. # 循环弹出,直到长度符合要求(防止并发写入时瞬间超出很多)
  201. # 这里简单点只弹出一个也可以,通常够用了
  202. redis_client.lpop(REDIS_FAIL_LOG_KEY)
  203. except Exception as e:
  204. # 记录日志本身如果报错,打印一下,不要影响主业务
  205. LOGGER.error(f"写入Redis失败日志出错: {str(e)}")
  206. @staticmethod
  207. def get_failure_logs(page=1, page_size=20):
  208. """
  209. 获取失败记录列表(支持简单分页)
  210. Redis List 下标从 0 开始
  211. """
  212. start = (page - 1) * page_size
  213. end = start + page_size - 1
  214. # 使用 RedisObject 的 lrange 方法
  215. # 注意:lrange 获取的是 字节(bytes) 或 字符串,需要反序列化
  216. raw_list = redis_client.lrange(REDIS_FAIL_LOG_KEY, start, end)
  217. formatted_list = []
  218. if raw_list:
  219. for item in raw_list:
  220. try:
  221. # 如果 item 是 bytes,需要 decode,如果是 str 则不需要
  222. if isinstance(item, bytes):
  223. item = item.decode('utf-8')
  224. formatted_list.append(json.loads(item))
  225. except Exception as e:
  226. LOGGER.error(f"反序列化失败日志出错: {str(e)}")
  227. continue
  228. return formatted_list