UnicomComboTaskController.py 18 KB


  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : UnicomComboTaskController.py
  4. @Time : 2022/6/30 16:23
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import datetime
  10. import logging
  11. import threading
  12. import time
  13. from decimal import Decimal
  14. from django.db import transaction
  15. from django.db.models import Q
  16. from django.views import View
  17. from Model.models import UnicomComboOrderInfo, UnicomCombo, Order_Model, UnicomDeviceInfo, UnicomFlowPush, \
  18. IotCardUsageHistory
  19. from Object.RedisObject import RedisObject
  20. from Object.ResponseObject import ResponseObject
  21. from Object.UnicomObject import UnicomObjeect
  22. logger = logging.getLogger('info')
  23. class UnicomComboTaskView(View):
  24. def get(self, request, *args, **kwargs):
  25. request.encoding = 'utf-8'
  26. operation = kwargs.get('operation')
  27. return self.validation(request.GET, request, operation)
  28. def post(self, request, *args, **kwargs):
  29. request.encoding = 'utf-8'
  30. operation = kwargs.get('operation')
  31. return self.validation(request.POST, request, operation)
  32. def validation(self, request_dict, request, operation):
  33. response = ResponseObject()
  34. print(request)
  35. if operation == 'check-activate':
  36. return self.check_activate_combo(request_dict, response)
  37. elif operation == 'check-flow':
  38. return self.check_flow_usage(response)
  39. elif operation == 'check-flow-expire':
  40. return self.check_flow_expire(response)
  41. elif operation == 'check-expire':
  42. today = datetime.datetime.today()
  43. year = today.year
  44. month = today.month
  45. self.query_unused_combo_and_activate(request_dict.get('iccid'), year, month, '666')
  46. return response.json(0)
  47. elif operation == 'updateFlowUsed': # 更新流量使用
  48. self.unicom_flow_used(request_dict, response)
  49. return response.json(0)
  50. elif operation == 'queryFlowUsedHistory':
  51. return self.query_flow_used_history(response)
  52. elif operation == 'queryFlowCache':
  53. return self.query_flow_cache(response)
  54. @classmethod
  55. def check_activate_combo(cls, request_dict, response):
  56. """
  57. 定时检查是否有次月激活套餐
  58. @param request_dict:
  59. @param response:
  60. @return:
  61. """
  62. print(request_dict)
  63. logger.info('--->进入监控次月激活联通套餐')
  64. now_time = int(time.time())
  65. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(status=0, next_month_activate=True,
  66. activation_time__lte=now_time,
  67. expire_time__gte=now_time, is_del=0).values()
  68. if not combo_order_info_qs.exists():
  69. return response.json(0)
  70. try:
  71. today = datetime.datetime.today()
  72. year = today.year
  73. month = today.month
  74. with transaction.atomic():
  75. unicom_api = UnicomObjeect()
  76. for item in combo_order_info_qs:
  77. if item['order_id']:
  78. order_id = item['order_id']
  79. order_qs = Order_Model.objects.filter(orderID=order_id, status=1)
  80. if not order_qs.exists():
  81. continue
  82. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, iccid=item['iccid'])
  83. # 当前已有套餐正在使用则跳出当前循环
  84. if combo_order_qs.exists():
  85. continue
  86. combo_id = item['combo_id']
  87. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  88. if not combo_qs.exists():
  89. continue
  90. # 查询当月用量情况
  91. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  92. flow_total_usage = Decimal(flow_total_usage).quantize(
  93. Decimal('0.00')) if flow_total_usage > 0 else 0
  94. flow_total_usage = str(flow_total_usage)
  95. iccid = item['iccid']
  96. # 检查激活iccid
  97. unicom_api.change_device_to_activate(iccid)
  98. cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage)
  99. logger.info('激活成功,订单编号:{}'.format(order_id))
  100. return response.json(0)
  101. except Exception as e:
  102. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  103. return response.json(177, repr(e))
  104. @classmethod
  105. def check_flow_usage(cls, response):
  106. """
  107. 检查流量使用情况
  108. @return:
  109. """
  110. logger.info('--->进入监控流量使用情况')
  111. try:
  112. unicom_api = UnicomObjeect()
  113. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, is_del=False, combo__is_unlimited=0).values()
  114. if not combo_order_qs.exists():
  115. return response.json(0)
  116. today = datetime.datetime.today()
  117. year = today.year
  118. month = today.month
  119. now_time = int(time.time())
  120. for item in combo_order_qs:
  121. iccid = item['iccid']
  122. u_device_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid)
  123. if not u_device_info_qs.exists():
  124. continue
  125. u_device_info_qs = u_device_info_qs.first()
  126. activate_usage_flow = float(item['flow_total_usage']) if item['flow_total_usage'] else 0.0
  127. combo_id = item['combo_id']
  128. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  129. if not combo_qs.exists():
  130. continue
  131. combo_qs = combo_qs.first()
  132. flow_total = combo_qs['flow_total']
  133. # 队列已使用总流量总量
  134. flow_total_usage = unicom_api.get_flow_usage_total(iccid)
  135. is_expire = False
  136. flow = activate_usage_flow + flow_total
  137. if flow_total_usage > 0:
  138. # 初始套餐已使用流量 + 套餐总流量
  139. if flow_total_usage >= flow:
  140. is_expire = True
  141. usage = (flow_total_usage - activate_usage_flow) if flow_total_usage > activate_usage_flow else 0
  142. cls.flow_warning_push(u_device_info_qs.user_id, u_device_info_qs.serial_no, item['id'], flow_total,
  143. usage)
  144. # 检查是否有当月未使用套餐 没有则停卡
  145. if is_expire:
  146. flow_exceed = flow_total_usage - flow
  147. UnicomComboOrderInfo.objects.filter(id=item['id']) \
  148. .update(status=2, updated_time=now_time, flow_exceed=flow_exceed)
  149. activate_status = cls.query_unused_combo_and_activate(iccid, year, month,
  150. flow_total_usage)
  151. logger.info('-->当前卡{}流量已用完,是否有生效套餐:{}'.format(iccid, activate_status))
  152. if not activate_status:
  153. # 停用
  154. unicom_api.change_device_to_disable(iccid)
  155. return response.json(0)
  156. except Exception as e:
  157. logger.info('出错了~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  158. return response.json(177, repr(e))
  159. @staticmethod
  160. def flow_warning_push(app_user_id, serial_no, combo_order_id, flow_total, flow_usage):
  161. """
  162. 监控流量使用大于85%and小于96%进行消息推送提醒
  163. @param app_user_id: app用户id
  164. @param serial_no: 序列号
  165. @param combo_order_id: 当前套餐订单id
  166. @param flow_total: 套餐流量总量
  167. @param flow_usage: 套餐已使用流量
  168. @return:
  169. """
  170. try:
  171. if not app_user_id:
  172. return False
  173. now_time = int(time.time())
  174. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': serial_no,
  175. 'flow_total_usage': flow_usage, 'flow_total': flow_total, 'status': 0,
  176. 'updated_time': now_time,
  177. 'created_time': now_time, 'user_id': app_user_id}
  178. if 0 < flow_total and 0 < flow_usage < flow_total:
  179. res = flow_usage / flow_total * 100
  180. if 85 < res <= 95:
  181. flow_push = UnicomFlowPush.objects.filter(serial_no=serial_no, combo_order_id=combo_order_id)
  182. if not flow_push.exists():
  183. UnicomFlowPush.objects.create(**push_data)
  184. elif flow_usage >= flow_total:
  185. push_data['flow_total_usage'] = flow_total
  186. push_data['type'] = 1
  187. UnicomFlowPush.objects.create(**push_data)
  188. return True
  189. except Exception as e:
  190. logger.info('出错了~异常流量监控,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  191. @staticmethod
  192. def query_unused_combo_and_activate(iccid, year, month, usage_flow):
  193. """
  194. 查询未使用套餐并激活
  195. @param iccid:
  196. @param year:
  197. @param month:
  198. @param usage_flow:
  199. @return:
  200. """
  201. try:
  202. now_time = int(time.time())
  203. combo_order_qs = UnicomComboOrderInfo.objects \
  204. .filter(expire_time__gt=now_time, activation_time__lte=now_time, status=0, iccid=iccid) \
  205. .order_by('created_time')
  206. if not combo_order_qs.exists():
  207. return False
  208. combo_order = combo_order_qs.first()
  209. if not combo_order.order_id:
  210. return False
  211. order_qs = Order_Model.objects.filter(orderID=combo_order.order_id, status=1)
  212. if not order_qs.exists():
  213. return False
  214. upd_data = {
  215. 'status': 1,
  216. 'year': year,
  217. 'month': month,
  218. 'flow_total_usage': str(usage_flow),
  219. 'activation_time': now_time,
  220. 'updated_time': now_time,
  221. }
  222. UnicomComboOrderInfo.objects.filter(id=combo_order.id).update(**upd_data)
  223. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  224. args=(iccid, combo_order.id, 3))
  225. asy.start()
  226. return True
  227. except Exception as e:
  228. logger.info('出错了~激活套餐,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  229. return False
  230. @classmethod
  231. def check_flow_expire(cls, response):
  232. """
  233. 检查流量到期停卡操作
  234. @param response:
  235. @return:
  236. """
  237. logger.info('--->进入监控流量到期停卡或激活叠加包')
  238. now_time = int(time.time())
  239. combo_order_qs = UnicomComboOrderInfo.objects.filter(~Q(status=2), expire_time__lte=now_time,
  240. is_del=False).values()
  241. today = datetime.datetime.today()
  242. year = today.year
  243. month = today.month
  244. if not combo_order_qs.exists():
  245. return response.json(0)
  246. iccid_list = []
  247. with transaction.atomic():
  248. for item in combo_order_qs:
  249. try:
  250. icc_id = item['iccid']
  251. um_device_qs = UnicomDeviceInfo.objects.filter(iccid=icc_id)
  252. if not um_device_qs.exists():
  253. continue
  254. UnicomComboOrderInfo.objects.filter(id=item['id']).update(status=2, updated_time=now_time)
  255. iccid_list.append(icc_id)
  256. logger.info('--->当前流量套餐已过期,iccid:{}'.format(icc_id))
  257. except Exception as e:
  258. logger.info('出错了~监控流量到期异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  259. continue
  260. # set无序不重复元素集
  261. iccid_list = list(set(iccid_list))
  262. unicom_api = UnicomObjeect()
  263. for item in iccid_list:
  264. activate_combo_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=1, expire_time__gt=now_time,
  265. is_del=False).values()
  266. if activate_combo_qs.exists():
  267. continue
  268. usage_flow = unicom_api.get_flow_usage_total(item)
  269. result = cls.query_unused_combo_and_activate(item, year, month, usage_flow)
  270. if not result:
  271. # 停用设备
  272. unicom_api.change_device_to_disable(item)
  273. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=2) \
  274. .values('id').order_by('-updated_time')
  275. combo_order = combo_order_info_qs.first()
  276. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  277. args=(icc_id, combo_order['id'], 4))
  278. asy.start()
  279. else:
  280. unicom_api.change_device_to_activate(item)
  281. return response.json(0)
  282. @staticmethod
  283. def async_combo_sys_msg_push(iccid, combo_order_id, push_type):
  284. """
  285. 异步保存消息推送 激活|过期
  286. @param iccid:
  287. @param combo_order_id:
  288. @param push_type:
  289. @return:
  290. """
  291. try:
  292. now_time = int(time.time())
  293. ud_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid).values('serial_no', 'user_id')
  294. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': ud_info_qs.first()['serial_no'],
  295. 'status': 0, 'type': push_type,
  296. 'updated_time': now_time,
  297. 'created_time': now_time, 'user_id': ud_info_qs.first()['user_id']}
  298. UnicomFlowPush.objects.create(**push_data)
  299. except Exception as e:
  300. logger.info('-->出错了~,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  301. @staticmethod
  302. def unicom_flow_used(request_dict, response):
  303. """
  304. 查询设备每张卡流量使用情况
  305. @param request_dict:
  306. @param response:
  307. @return:
  308. """
  309. page_size = int(request_dict.get('pageSize', 1))
  310. device_count = UnicomDeviceInfo.objects.filter(card_type=0).count()
  311. total_pages = device_count // page_size + (device_count % page_size > 0) # 计算总页数
  312. for page_number in range(1, total_pages + 1):
  313. u_device_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('id', 'iccid', 'sim_used_flow').order_by(
  314. '-created_time')[(page_number - 1) * page_size:page_number * page_size]
  315. asy = threading.Thread(target=UnicomComboTaskView.thread_collect_flow_used, args=(u_device_qs,))
  316. asy.start()
  317. return response.json(0)
  318. @staticmethod
  319. def thread_collect_flow_used(u_device_qs):
  320. for item in u_device_qs:
  321. try:
  322. unicom_api = UnicomObjeect()
  323. n_time = int(time.time())
  324. # 队列已使用总流量总量
  325. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  326. UnicomDeviceInfo.objects.filter(id=item['id']).update(updated_time=n_time,
  327. sim_used_flow=flow_total_usage)
  328. except Exception as e:
  329. print(repr(e))
  330. continue
  331. @classmethod
  332. def query_flow_used_history(cls, response):
  333. # 获取符合条件的卡片对象查询集,并按创建时间升序排序
  334. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').order_by('created_time')
  335. if not card_qs.exists():
  336. return response.json(0)
  337. asy = threading.Thread(target=UnicomComboTaskView.async_bulk_create_usage_history, args=(card_qs,))
  338. asy.start()
  339. return response.json(0)
  340. @staticmethod
  341. def async_bulk_create_usage_history(qs):
  342. """
  343. 异步批量创建流量用量历史记录
  344. """
  345. redis_obj = RedisObject()
  346. current_time = int(time.time()) # 获取当前时间戳
  347. for item in qs:
  348. iccid = item['iccid']
  349. key = 'monthly_flow_' + iccid
  350. flow_dict = redis_obj.get_all_hash_data(key) # 获取Redis中指定键的哈希表数据
  351. iot_card_list = [] # 创建一个空的列表,用于批量创建IotCardUsageHistory对象
  352. for k, v in flow_dict.items():
  353. try:
  354. cycle = datetime.datetime.strptime(str(k.decode()), '%Y-%m') # 将字符串日期解析为datetime类型
  355. flow = float(v)
  356. iot_card_list.append(IotCardUsageHistory(
  357. iccid=iccid,
  358. card_type=1,
  359. cycle=int(cycle.strftime('%Y%m')), # 将日期转换为整数形式,如202201
  360. flow_total_usage=flow,
  361. created_time=current_time,
  362. updated_time=current_time
  363. ))
  364. except Exception as e:
  365. print(repr(e))
  366. continue
  367. # 批量创建IotCardUsageHistory对象
  368. if iot_card_list:
  369. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  370. @classmethod
  371. def query_flow_cache(cls, response):
  372. """
  373. 查询流量缓存永久的将设置过期时间为10分钟
  374. """
  375. redis = RedisObject()
  376. try:
  377. res = redis.get_keys('ASJ:UNICOM:FLOW:*')
  378. keys = [key.decode() for key in res]
  379. # 进行进一步的处理或打印
  380. for key in keys:
  381. ttl = redis.get_ttl(key)
  382. if ttl == -1:
  383. logger.info('iccidFlow:{}'.format(key))
  384. redis.CONN.expire(key, 60 * 10)
  385. return response.json(0)
  386. except Exception as e:
  387. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  388. return response.json(177, repr(e))