UnicomComboTaskController.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : UnicomComboTaskController.py
  4. @Time : 2022/6/30 16:23
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import datetime
  10. import logging
  11. import threading
  12. import time
  13. from decimal import Decimal
  14. from django.db import transaction
  15. from django.db.models import Q
  16. from django.views import View
  17. from Model.models import UnicomComboOrderInfo, UnicomCombo, Order_Model, UnicomDeviceInfo, UnicomFlowPush, \
  18. IotCardUsageHistory
  19. from Object.RedisObject import RedisObject
  20. from Object.ResponseObject import ResponseObject
  21. from Object.UnicomObject import UnicomObjeect
  22. logger = logging.getLogger('info')
  23. class UnicomComboTaskView(View):
  24. def get(self, request, *args, **kwargs):
  25. request.encoding = 'utf-8'
  26. operation = kwargs.get('operation')
  27. return self.validation(request.GET, request, operation)
  28. def post(self, request, *args, **kwargs):
  29. request.encoding = 'utf-8'
  30. operation = kwargs.get('operation')
  31. return self.validation(request.POST, request, operation)
  32. def validation(self, request_dict, request, operation):
  33. response = ResponseObject()
  34. print(request)
  35. if operation == 'check-activate':
  36. return self.check_activate_combo(request_dict, response)
  37. elif operation == 'check-flow':
  38. return self.check_flow_usage(response)
  39. elif operation == 'check-flow-expire':
  40. return self.check_flow_expire(response)
  41. elif operation == 'check-expire':
  42. today = datetime.datetime.today()
  43. year = today.year
  44. month = today.month
  45. self.query_unused_combo_and_activate(request_dict.get('iccid'), year, month, '666')
  46. return response.json(0)
  47. elif operation == 'updateFlowUsed': # 更新流量使用
  48. self.unicom_flow_used(request_dict, response)
  49. return response.json(0)
  50. elif operation == 'queryFlowUsedHistory':
  51. return self.query_flow_used_history(response)
  52. elif operation == 'queryFlowCache':
  53. return self.query_flow_cache(response)
  54. elif operation == 'getDeviceUsageHistory':
  55. return self.get_device_usage_history(response)
  56. else:
  57. return response.json(414)
  58. @classmethod
  59. def check_activate_combo(cls, request_dict, response):
  60. """
  61. 定时检查是否有次月激活套餐
  62. @param request_dict:
  63. @param response:
  64. @return:
  65. """
  66. print(request_dict)
  67. logger.info('--->进入监控次月激活联通套餐')
  68. now_time = int(time.time())
  69. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(status=0, next_month_activate=True,
  70. activation_time__lte=now_time,
  71. expire_time__gte=now_time, is_del=0).values()
  72. if not combo_order_info_qs.exists():
  73. return response.json(0)
  74. try:
  75. today = datetime.datetime.today()
  76. year = today.year
  77. month = today.month
  78. with transaction.atomic():
  79. unicom_api = UnicomObjeect()
  80. for item in combo_order_info_qs:
  81. if item['order_id']:
  82. order_id = item['order_id']
  83. order_qs = Order_Model.objects.filter(orderID=order_id, status=1)
  84. if not order_qs.exists():
  85. continue
  86. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, iccid=item['iccid'])
  87. # 当前已有套餐正在使用则跳出当前循环
  88. if combo_order_qs.exists():
  89. continue
  90. combo_id = item['combo_id']
  91. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  92. if not combo_qs.exists():
  93. continue
  94. # 查询当月用量情况
  95. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  96. flow_total_usage = Decimal(flow_total_usage).quantize(
  97. Decimal('0.00')) if flow_total_usage > 0 else 0
  98. flow_total_usage = str(flow_total_usage)
  99. iccid = item['iccid']
  100. # 检查激活iccid
  101. unicom_api.change_device_to_activate(iccid)
  102. cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage)
  103. logger.info('激活成功,订单编号:{}'.format(order_id))
  104. return response.json(0)
  105. except Exception as e:
  106. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  107. return response.json(177, repr(e))
  108. @classmethod
  109. def check_flow_usage(cls, response):
  110. """
  111. 检查流量使用情况
  112. @return:
  113. """
  114. logger.info('--->进入监控流量使用情况')
  115. try:
  116. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, is_del=False, combo__is_unlimited=0).values()
  117. if not combo_order_qs.exists():
  118. return response.json(0)
  119. asy = threading.Thread(target=UnicomComboTaskView.async_monitoring_flow, args=(combo_order_qs,))
  120. asy.start()
  121. return response.json(0)
  122. except Exception as e:
  123. logger.info('出错了~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  124. return response.json(177, repr(e))
  125. @classmethod
  126. def async_monitoring_flow(cls, combo_order_qs):
  127. """
  128. 异步检测流量使用详情
  129. """
  130. try:
  131. unicom_api = UnicomObjeect()
  132. today = datetime.datetime.today()
  133. year = today.year
  134. month = today.month
  135. now_time = int(time.time())
  136. for item in combo_order_qs:
  137. iccid = item['iccid']
  138. u_device_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid)
  139. if not u_device_info_qs.exists():
  140. continue
  141. u_device_info_qs = u_device_info_qs.first()
  142. activate_usage_flow = float(item['flow_total_usage']) if item['flow_total_usage'] else 0.0
  143. combo_id = item['combo_id']
  144. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  145. if not combo_qs.exists():
  146. continue
  147. combo_qs = combo_qs.first()
  148. flow_total = combo_qs['flow_total']
  149. # 队列已使用总流量总量
  150. flow_total_usage = unicom_api.get_flow_usage_total(iccid)
  151. flow_total_usage = float(flow_total_usage)
  152. is_expire = False
  153. flow = activate_usage_flow + flow_total
  154. if flow_total_usage > 0:
  155. # 初始套餐已使用流量 + 套餐总流量
  156. if flow_total_usage >= flow:
  157. is_expire = True
  158. usage = (flow_total_usage - activate_usage_flow) if flow_total_usage > activate_usage_flow else 0
  159. cls.flow_warning_push(u_device_info_qs.user_id, u_device_info_qs.serial_no, item['id'], flow_total,
  160. usage)
  161. # 检查是否有当月未使用套餐 没有则停卡
  162. if is_expire:
  163. flow_exceed = flow_total_usage - flow
  164. UnicomComboOrderInfo.objects.filter(id=item['id']) \
  165. .update(status=2, updated_time=now_time, flow_exceed=flow_exceed)
  166. activate_status = cls.query_unused_combo_and_activate(iccid, year, month,
  167. flow_total_usage)
  168. logger.info('-->当前卡{}流量已用完,是否有生效套餐:{}'.format(iccid, activate_status))
  169. if not activate_status:
  170. # 停用
  171. unicom_api.change_device_to_disable(iccid)
  172. except Exception as e:
  173. logger.info('异步~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  174. @staticmethod
  175. def flow_warning_push(app_user_id, serial_no, combo_order_id, flow_total, flow_usage):
  176. """
  177. 监控流量使用大于85%and小于96%进行消息推送提醒
  178. @param app_user_id: app用户id
  179. @param serial_no: 序列号
  180. @param combo_order_id: 当前套餐订单id
  181. @param flow_total: 套餐流量总量
  182. @param flow_usage: 套餐已使用流量
  183. @return:
  184. """
  185. try:
  186. if not app_user_id:
  187. return False
  188. now_time = int(time.time())
  189. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': serial_no,
  190. 'flow_total_usage': flow_usage, 'flow_total': flow_total, 'status': 0,
  191. 'updated_time': now_time,
  192. 'created_time': now_time, 'user_id': app_user_id}
  193. if 0 < flow_total and 0 < flow_usage < flow_total:
  194. res = flow_usage / flow_total * 100
  195. if 85 < res <= 95:
  196. flow_push = UnicomFlowPush.objects.filter(serial_no=serial_no, combo_order_id=combo_order_id)
  197. if not flow_push.exists():
  198. UnicomFlowPush.objects.create(**push_data)
  199. elif flow_usage >= flow_total:
  200. push_data['flow_total_usage'] = flow_total
  201. push_data['type'] = 1
  202. UnicomFlowPush.objects.create(**push_data)
  203. return True
  204. except Exception as e:
  205. logger.info('出错了~异常流量监控,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  206. @staticmethod
  207. def query_unused_combo_and_activate(iccid, year, month, usage_flow):
  208. """
  209. 查询未使用套餐并激活
  210. @param iccid:
  211. @param year:
  212. @param month:
  213. @param usage_flow:
  214. @return:
  215. """
  216. try:
  217. now_time = int(time.time())
  218. combo_order_qs = UnicomComboOrderInfo.objects \
  219. .filter(expire_time__gt=now_time, activation_time__lte=now_time, status=0, iccid=iccid) \
  220. .order_by('created_time')
  221. if not combo_order_qs.exists():
  222. return False
  223. combo_order = combo_order_qs.first()
  224. if not combo_order.order_id:
  225. return False
  226. order_qs = Order_Model.objects.filter(orderID=combo_order.order_id, status=1)
  227. if not order_qs.exists():
  228. return False
  229. upd_data = {
  230. 'status': 1,
  231. 'year': year,
  232. 'month': month,
  233. 'flow_total_usage': str(usage_flow),
  234. 'activation_time': now_time,
  235. 'updated_time': now_time,
  236. }
  237. UnicomComboOrderInfo.objects.filter(id=combo_order.id).update(**upd_data)
  238. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  239. args=(iccid, combo_order.id, 3))
  240. asy.start()
  241. return True
  242. except Exception as e:
  243. logger.info('出错了~激活套餐,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  244. return False
  245. @classmethod
  246. def check_flow_expire(cls, response):
  247. """
  248. 检查流量到期停卡操作
  249. @param response:
  250. @return:
  251. """
  252. logger.info('--->进入监控流量到期停卡或激活叠加包')
  253. now_time = int(time.time())
  254. combo_order_qs = UnicomComboOrderInfo.objects.filter(~Q(status=2), expire_time__lte=now_time,
  255. is_del=False).values()
  256. today = datetime.datetime.today()
  257. year = today.year
  258. month = today.month
  259. if not combo_order_qs.exists():
  260. return response.json(0)
  261. iccid_list = []
  262. with transaction.atomic():
  263. for item in combo_order_qs:
  264. try:
  265. icc_id = item['iccid']
  266. um_device_qs = UnicomDeviceInfo.objects.filter(iccid=icc_id)
  267. if not um_device_qs.exists():
  268. continue
  269. UnicomComboOrderInfo.objects.filter(id=item['id']).update(status=2, updated_time=now_time)
  270. iccid_list.append(icc_id)
  271. logger.info('--->当前流量套餐已过期,iccid:{}'.format(icc_id))
  272. except Exception as e:
  273. logger.info('出错了~监控流量到期异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  274. continue
  275. # set无序不重复元素集
  276. iccid_list = list(set(iccid_list))
  277. unicom_api = UnicomObjeect()
  278. for item in iccid_list:
  279. activate_combo_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=1, expire_time__gt=now_time,
  280. is_del=False).values()
  281. if activate_combo_qs.exists():
  282. continue
  283. usage_flow = unicom_api.get_flow_usage_total(item)
  284. result = cls.query_unused_combo_and_activate(item, year, month, usage_flow)
  285. if not result:
  286. # 停用设备
  287. unicom_api.change_device_to_disable(item)
  288. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=2) \
  289. .values('id').order_by('-updated_time')
  290. combo_order = combo_order_info_qs.first()
  291. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  292. args=(icc_id, combo_order['id'], 4))
  293. asy.start()
  294. else:
  295. unicom_api.change_device_to_activate(item)
  296. return response.json(0)
  297. @staticmethod
  298. def async_combo_sys_msg_push(iccid, combo_order_id, push_type):
  299. """
  300. 异步保存消息推送 激活|过期
  301. @param iccid:
  302. @param combo_order_id:
  303. @param push_type:
  304. @return:
  305. """
  306. try:
  307. now_time = int(time.time())
  308. ud_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid).values('serial_no', 'user_id')
  309. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': ud_info_qs.first()['serial_no'],
  310. 'status': 0, 'type': push_type,
  311. 'updated_time': now_time,
  312. 'created_time': now_time, 'user_id': ud_info_qs.first()['user_id']}
  313. UnicomFlowPush.objects.create(**push_data)
  314. except Exception as e:
  315. logger.info('-->出错了~,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  316. @staticmethod
  317. def unicom_flow_used(request_dict, response):
  318. """
  319. 查询设备每张卡流量使用情况
  320. @param request_dict:
  321. @param response:
  322. @return:
  323. """
  324. page_size = int(request_dict.get('pageSize', 1))
  325. device_count = UnicomDeviceInfo.objects.filter(card_type=0).count()
  326. total_pages = device_count // page_size + (device_count % page_size > 0) # 计算总页数
  327. for page_number in range(1, total_pages + 1):
  328. u_device_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('id', 'iccid', 'sim_used_flow').order_by(
  329. '-created_time')[(page_number - 1) * page_size:page_number * page_size]
  330. asy = threading.Thread(target=UnicomComboTaskView.thread_collect_flow_used, args=(u_device_qs,))
  331. asy.start()
  332. return response.json(0)
  333. @staticmethod
  334. def thread_collect_flow_used(u_device_qs):
  335. for item in u_device_qs:
  336. try:
  337. unicom_api = UnicomObjeect()
  338. n_time = int(time.time())
  339. # 队列已使用总流量总量
  340. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  341. UnicomDeviceInfo.objects.filter(id=item['id']).update(updated_time=n_time,
  342. sim_used_flow=flow_total_usage)
  343. except Exception as e:
  344. print(repr(e))
  345. continue
  346. @classmethod
  347. def query_flow_used_history(cls, response):
  348. # 获取符合条件的卡片对象查询集,并按创建时间升序排序
  349. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').order_by('created_time')
  350. if not card_qs.exists():
  351. return response.json(0)
  352. asy = threading.Thread(target=UnicomComboTaskView.async_bulk_create_usage_history, args=(card_qs,))
  353. asy.start()
  354. return response.json(0)
  355. @staticmethod
  356. def async_bulk_create_usage_history(qs):
  357. """
  358. 异步批量创建流量用量历史记录
  359. """
  360. redis_obj = RedisObject()
  361. current_time = int(time.time()) # 获取当前时间戳
  362. for item in qs:
  363. iccid = item['iccid']
  364. key = 'monthly_flow_' + iccid
  365. flow_dict = redis_obj.get_all_hash_data(key) # 获取Redis中指定键的哈希表数据
  366. iot_card_list = [] # 创建一个空的列表,用于批量创建IotCardUsageHistory对象
  367. for k, v in flow_dict.items():
  368. try:
  369. cycle = datetime.datetime.strptime(str(k.decode()), '%Y-%m') # 将字符串日期解析为datetime类型
  370. flow = float(v)
  371. iot_card_list.append(IotCardUsageHistory(
  372. iccid=iccid,
  373. card_type=1,
  374. cycle=int(cycle.strftime('%Y%m')), # 将日期转换为整数形式,如202201
  375. flow_total_usage=flow,
  376. created_time=current_time,
  377. updated_time=current_time
  378. ))
  379. except Exception as e:
  380. print(repr(e))
  381. continue
  382. # 批量创建IotCardUsageHistory对象
  383. if iot_card_list:
  384. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  385. @classmethod
  386. def query_flow_cache(cls, response):
  387. """
  388. 查询流量缓存永久的将设置过期时间为10分钟
  389. """
  390. redis = RedisObject()
  391. try:
  392. res = redis.get_keys('ASJ:UNICOM:FLOW:*')
  393. keys = [key.decode() for key in res]
  394. # 进行进一步的处理或打印
  395. for key in keys:
  396. ttl = redis.get_ttl(key)
  397. if ttl == -1:
  398. logger.info('iccidFlow:{}'.format(key))
  399. redis.CONN.expire(key, 60 * 10)
  400. return response.json(0)
  401. except Exception as e:
  402. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  403. return response.json(177, repr(e))
  404. @staticmethod
  405. def get_device_usage_history(response):
  406. """
  407. 查询联通设备历史用量
  408. """
  409. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid')
  410. if not card_qs.exists():
  411. return response.json(0)
  412. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_usage_history, args=(card_qs,))
  413. asy.start()
  414. return response.json(0)
  415. @staticmethod
  416. def async_update_device_usage_history(qs):
  417. """
  418. 异步更新设备用量历史
  419. @param qs: 联通iccid集合
  420. """
  421. try:
  422. u_service = UnicomObjeect()
  423. iot_card_list = []
  424. now_time = int(time.time())
  425. # 获取当前时间
  426. current_date = datetime.datetime.now()
  427. # 计算上个月的时间
  428. last_month_date = current_date - datetime.timedelta(days=current_date.day)
  429. # 例格式化日期为 "202307"
  430. formatted_date = last_month_date.strftime('%Y%m')
  431. for item in qs:
  432. params = {'iccid': item['iccid']}
  433. result = u_service.query_device_usage_history(**params)
  434. res_dict = u_service.get_text_dict(result)
  435. if res_dict['code'] == 0 and res_dict['data']:
  436. for cycle in res_dict['data']:
  437. if cycle['flowTotalUsage'] <= 0 or cycle['cycle'] != int(formatted_date):
  438. continue
  439. iot_card_list.append(IotCardUsageHistory(
  440. iccid=item['iccid'],
  441. card_type=1,
  442. cycle=cycle['cycle'],
  443. flow_total_usage=cycle['flowTotalUsage'],
  444. created_time=now_time,
  445. updated_time=now_time
  446. ))
  447. if not iot_card_list:
  448. return None
  449. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  450. except Exception as e:
  451. logger.info('查询账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  452. return None