UnicomComboTaskController.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : UnicomComboTaskController.py
  4. @Time : 2022/6/30 16:23
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import datetime
  10. import logging
  11. import threading
  12. import time
  13. from decimal import Decimal
  14. from django.db import transaction
  15. from django.db.models import Q
  16. from django.views import View
  17. from Model.models import UnicomComboOrderInfo, UnicomCombo, Order_Model, UnicomDeviceInfo, UnicomFlowPush, \
  18. IotCardUsageHistory
  19. from Object.RedisObject import RedisObject
  20. from Object.ResponseObject import ResponseObject
  21. from Object.UnicomObject import UnicomObjeect
  22. from Service.TelecomService import TelecomService
  23. logger = logging.getLogger('info')
  24. class UnicomComboTaskView(View):
  25. def get(self, request, *args, **kwargs):
  26. request.encoding = 'utf-8'
  27. operation = kwargs.get('operation')
  28. return self.validation(request.GET, request, operation)
  29. def post(self, request, *args, **kwargs):
  30. request.encoding = 'utf-8'
  31. operation = kwargs.get('operation')
  32. return self.validation(request.POST, request, operation)
  33. def validation(self, request_dict, request, operation):
  34. response = ResponseObject()
  35. print(request)
  36. if operation == 'check-activate':
  37. return self.check_activate_combo(request_dict, response)
  38. elif operation == 'check-flow':
  39. return self.check_flow_usage(response)
  40. elif operation == 'check-flow-expire':
  41. return self.check_flow_expire(response)
  42. elif operation == 'check-expire':
  43. today = datetime.datetime.today()
  44. year = today.year
  45. month = today.month
  46. self.query_unused_combo_and_activate(request_dict.get('iccid'), year, month, '666')
  47. return response.json(0)
  48. elif operation == 'updateFlowUsed': # 更新流量使用
  49. self.unicom_flow_used(request_dict, response)
  50. return response.json(0)
  51. elif operation == 'queryFlowUsedHistory':
  52. return self.query_flow_used_history(response)
  53. elif operation == 'queryFlowCache':
  54. return self.query_flow_cache(response)
  55. elif operation == 'getDeviceUsageHistory':
  56. return self.get_device_usage_history(response)
  57. elif operation == 'updateCardCycleFlow':
  58. return self.update_device_cycle_flow(response)
  59. else:
  60. return response.json(414)
  61. @classmethod
  62. def check_activate_combo(cls, request_dict, response):
  63. """
  64. 定时检查是否有次月激活套餐
  65. @param request_dict:
  66. @param response:
  67. @return:
  68. """
  69. print(request_dict)
  70. logger.info('--->进入监控次月激活联通套餐')
  71. now_time = int(time.time())
  72. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(status=0, next_month_activate=True,
  73. activation_time__lte=now_time,
  74. expire_time__gte=now_time, is_del=0).values()
  75. if not combo_order_info_qs.exists():
  76. return response.json(0)
  77. try:
  78. today = datetime.datetime.today()
  79. year = today.year
  80. month = today.month
  81. with transaction.atomic():
  82. unicom_api = UnicomObjeect()
  83. for item in combo_order_info_qs:
  84. if item['order_id']:
  85. order_id = item['order_id']
  86. order_qs = Order_Model.objects.filter(orderID=order_id, status=1)
  87. if not order_qs.exists():
  88. continue
  89. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, iccid=item['iccid'])
  90. # 当前已有套餐正在使用则跳出当前循环
  91. if combo_order_qs.exists():
  92. continue
  93. combo_id = item['combo_id']
  94. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  95. if not combo_qs.exists():
  96. continue
  97. # 查询当月用量情况
  98. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  99. flow_total_usage = Decimal(flow_total_usage).quantize(
  100. Decimal('0.00')) if flow_total_usage > 0 else 0
  101. flow_total_usage = str(flow_total_usage)
  102. iccid = item['iccid']
  103. # 检查激活iccid
  104. unicom_api.change_device_to_activate(iccid)
  105. cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage)
  106. logger.info('激活成功,订单编号:{}'.format(order_id))
  107. return response.json(0)
  108. except Exception as e:
  109. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  110. return response.json(177, repr(e))
  111. @classmethod
  112. def check_flow_usage(cls, response):
  113. """
  114. 检查流量使用情况
  115. @return:
  116. """
  117. logger.info('--->进入监控流量使用情况')
  118. try:
  119. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, is_del=False, combo__is_unlimited=0).values()
  120. if not combo_order_qs.exists():
  121. return response.json(0)
  122. asy = threading.Thread(target=UnicomComboTaskView.async_monitoring_flow, args=(combo_order_qs,))
  123. asy.start()
  124. return response.json(0)
  125. except Exception as e:
  126. logger.info('出错了~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  127. return response.json(177, repr(e))
  128. @classmethod
  129. def async_monitoring_flow(cls, combo_order_qs):
  130. """
  131. 异步检测流量使用详情
  132. """
  133. try:
  134. unicom_api = UnicomObjeect()
  135. today = datetime.datetime.today()
  136. year = today.year
  137. month = today.month
  138. now_time = int(time.time())
  139. for item in combo_order_qs:
  140. iccid = item['iccid']
  141. u_device_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid)
  142. if not u_device_info_qs.exists():
  143. continue
  144. u_device_info_qs = u_device_info_qs.first()
  145. card_type = u_device_info_qs.card_type
  146. activate_usage_flow = float(item['flow_total_usage']) if item['flow_total_usage'] else 0.0
  147. combo_id = item['combo_id']
  148. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  149. if not combo_qs.exists():
  150. continue
  151. combo_qs = combo_qs.first()
  152. flow_total = combo_qs['flow_total']
  153. # 队列已使用总流量总量
  154. flow_total_usage = unicom_api.get_flow_usage_total(iccid, card_type)
  155. flow_total_usage = float(flow_total_usage)
  156. is_expire = False
  157. flow = activate_usage_flow + flow_total # 激活套餐时ICCID历史用量+当前套餐流量
  158. if flow_total_usage > 0:
  159. # 初始套餐已使用流量 + 套餐总流量
  160. if flow_total_usage >= flow:
  161. is_expire = True
  162. usage = (flow_total_usage - activate_usage_flow) if flow_total_usage > activate_usage_flow else 0
  163. cls.flow_warning_push(u_device_info_qs.user_id, u_device_info_qs.serial_no, item['id'], flow_total,
  164. usage)
  165. # 检查是否有当月未使用套餐 没有则停卡
  166. if is_expire:
  167. flow_exceed = flow_total_usage - flow
  168. UnicomComboOrderInfo.objects.filter(id=item['id']) \
  169. .update(status=2, updated_time=now_time, flow_exceed=flow_exceed)
  170. activate_status = cls.query_unused_combo_and_activate(iccid, year, month,
  171. flow_total_usage)
  172. logger.info('-->当前卡{}流量已用完,是否有生效套餐:{}'.format(iccid, activate_status))
  173. if not activate_status: # 停用或断网
  174. if card_type == 3: # 鼎芯电信
  175. TelecomService().update_access_number_network(u_device_info_qs.access_number, 'ADD')
  176. else:
  177. unicom_api.change_device_to_disable(iccid)
  178. except Exception as e:
  179. logger.info('异步~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  180. @staticmethod
  181. def flow_warning_push(app_user_id, serial_no, combo_order_id, flow_total, flow_usage):
  182. """
  183. 监控流量使用大于85%and小于96%进行消息推送提醒
  184. @param app_user_id: app用户id
  185. @param serial_no: 序列号
  186. @param combo_order_id: 当前套餐订单id
  187. @param flow_total: 套餐流量总量
  188. @param flow_usage: 套餐已使用流量
  189. @return:
  190. """
  191. try:
  192. if not app_user_id:
  193. return False
  194. now_time = int(time.time())
  195. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': serial_no,
  196. 'flow_total_usage': flow_usage, 'flow_total': flow_total, 'status': 0,
  197. 'updated_time': now_time,
  198. 'created_time': now_time, 'user_id': app_user_id}
  199. if 0 < flow_total and 0 < flow_usage < flow_total:
  200. res = flow_usage / flow_total * 100
  201. if 85 < res <= 95:
  202. flow_push = UnicomFlowPush.objects.filter(serial_no=serial_no, combo_order_id=combo_order_id)
  203. if not flow_push.exists():
  204. UnicomFlowPush.objects.create(**push_data)
  205. elif flow_usage >= flow_total:
  206. push_data['flow_total_usage'] = flow_total
  207. push_data['type'] = 1
  208. UnicomFlowPush.objects.create(**push_data)
  209. return True
  210. except Exception as e:
  211. logger.info('出错了~异常流量监控,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  212. @staticmethod
  213. def query_unused_combo_and_activate(iccid, year, month, usage_flow):
  214. """
  215. 查询未使用套餐并激活
  216. @param iccid:
  217. @param year:
  218. @param month:
  219. @param usage_flow:
  220. @return:
  221. """
  222. try:
  223. now_time = int(time.time())
  224. combo_order_qs = UnicomComboOrderInfo.objects \
  225. .filter(expire_time__gt=now_time, activation_time__lte=now_time, status=0, iccid=iccid) \
  226. .order_by('created_time')
  227. if not combo_order_qs.exists():
  228. return False
  229. combo_order = combo_order_qs.first()
  230. if not combo_order.order_id:
  231. return False
  232. order_qs = Order_Model.objects.filter(orderID=combo_order.order_id, status=1)
  233. if not order_qs.exists():
  234. return False
  235. upd_data = {
  236. 'status': 1,
  237. 'year': year,
  238. 'month': month,
  239. 'flow_total_usage': str(usage_flow),
  240. 'activation_time': now_time,
  241. 'updated_time': now_time,
  242. }
  243. UnicomComboOrderInfo.objects.filter(id=combo_order.id).update(**upd_data)
  244. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  245. args=(iccid, combo_order.id, 3))
  246. asy.start()
  247. return True
  248. except Exception as e:
  249. logger.info('出错了~激活套餐,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  250. return False
  251. @classmethod
  252. def check_flow_expire(cls, response):
  253. """
  254. 检查流量到期停卡操作
  255. @param response:
  256. @return:
  257. """
  258. logger.info('check_flow_expire进入监控流量到期停卡或激活叠加包')
  259. now_time = int(time.time())
  260. combo_order_qs = UnicomComboOrderInfo.objects.filter(~Q(status=2), expire_time__lte=now_time,
  261. is_del=False).values()
  262. if not combo_order_qs.exists():
  263. return response.json(0)
  264. asy = threading.Thread(target=UnicomComboTaskView.async_deactivate_expire_package,
  265. args=(combo_order_qs, now_time))
  266. asy.start()
  267. return response.json(0)
  268. @staticmethod
  269. def async_deactivate_expire_package(combo_order_qs, now_time):
  270. """
  271. 异步停用过期套餐
  272. @param combo_order_qs: 过期套餐querySet
  273. @param now_time 当前实际
  274. """
  275. today = datetime.datetime.today()
  276. year = today.year
  277. month = today.month
  278. iccid_list = []
  279. for item in combo_order_qs:
  280. try:
  281. icc_id = item['iccid']
  282. um_device_qs = UnicomDeviceInfo.objects.filter(iccid=icc_id)
  283. if not um_device_qs.exists():
  284. continue
  285. UnicomComboOrderInfo.objects.filter(id=item['id']).update(status=2, updated_time=now_time)
  286. iccid_list.append(icc_id)
  287. logger.info('修改套餐已失效成功,iccid:{}'.format(icc_id))
  288. except Exception as e:
  289. logger.info('async_deactivate_expire_package套餐过期修改失效异常,{}'
  290. 'errLine:{}, errMsg:{}'.format(item['iccid'], e.__traceback__.tb_lineno, repr(e)))
  291. continue
  292. # set无序不重复元素集
  293. iccid_list = list(set(iccid_list))
  294. unicom_api = UnicomObjeect()
  295. for item in iccid_list:
  296. try:
  297. activate_combo_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=1, expire_time__gt=now_time,
  298. is_del=False).values()
  299. if activate_combo_qs.exists():
  300. continue
  301. usage_flow = unicom_api.get_flow_usage_total(item)
  302. # 查询是否有未使用套餐,有则进行激活 否 则调用API停卡
  303. result = UnicomComboTaskView().query_unused_combo_and_activate(item, year, month, usage_flow)
  304. if not result: # 没有可用套餐进行停卡
  305. # 停用设备
  306. unicom_api.change_device_to_disable(item)
  307. logger.info('调用停卡API successful,iccid:{}'.format(item))
  308. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=2) \
  309. .values('id').order_by('-updated_time')
  310. combo_order = combo_order_info_qs.first()
  311. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  312. args=(item, combo_order['id'], 4))
  313. asy.start()
  314. else:
  315. unicom_api.change_device_to_activate(item)
  316. except Exception as e:
  317. logger.info('async_deactivate_expire_package套餐过期停卡异常,{}'
  318. 'errLine:{}, errMsg:{}'.format(item, e.__traceback__.tb_lineno, repr(e)))
  319. continue
  320. @staticmethod
  321. def async_combo_sys_msg_push(iccid, combo_order_id, push_type):
  322. """
  323. 异步保存消息推送 激活|过期
  324. @param iccid:
  325. @param combo_order_id:
  326. @param push_type:
  327. @return:
  328. """
  329. try:
  330. now_time = int(time.time())
  331. ud_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid).values('serial_no', 'user_id')
  332. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': ud_info_qs.first()['serial_no'],
  333. 'status': 0, 'type': push_type,
  334. 'updated_time': now_time,
  335. 'created_time': now_time, 'user_id': ud_info_qs.first()['user_id']}
  336. UnicomFlowPush.objects.create(**push_data)
  337. except Exception as e:
  338. logger.info('-->出错了~,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  339. @staticmethod
  340. def unicom_flow_used(request_dict, response):
  341. """
  342. 查询设备每张卡流量使用情况
  343. @param request_dict:
  344. @param response:
  345. @return:
  346. """
  347. page_size = int(request_dict.get('pageSize', 1))
  348. device_count = UnicomDeviceInfo.objects.filter(card_type=0).count()
  349. total_pages = device_count // page_size + (device_count % page_size > 0) # 计算总页数
  350. for page_number in range(1, total_pages + 1):
  351. u_device_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('id', 'iccid', 'sim_used_flow').order_by(
  352. '-created_time')[(page_number - 1) * page_size:page_number * page_size]
  353. asy = threading.Thread(target=UnicomComboTaskView.thread_collect_flow_used, args=(u_device_qs,))
  354. asy.start()
  355. return response.json(0)
  356. @staticmethod
  357. def thread_collect_flow_used(u_device_qs):
  358. for item in u_device_qs:
  359. try:
  360. unicom_api = UnicomObjeect()
  361. n_time = int(time.time())
  362. # 队列已使用总流量总量
  363. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  364. UnicomDeviceInfo.objects.filter(id=item['id']).update(updated_time=n_time,
  365. sim_used_flow=flow_total_usage)
  366. except Exception as e:
  367. print(repr(e))
  368. continue
  369. @classmethod
  370. def query_flow_used_history(cls, response):
  371. # 获取符合条件的卡片对象查询集,并按创建时间升序排序
  372. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').order_by('created_time')
  373. if not card_qs.exists():
  374. return response.json(0)
  375. asy = threading.Thread(target=UnicomComboTaskView.async_bulk_create_usage_history, args=(card_qs,))
  376. asy.start()
  377. return response.json(0)
  378. @staticmethod
  379. def async_bulk_create_usage_history(qs):
  380. """
  381. 异步批量创建流量用量历史记录
  382. """
  383. redis_obj = RedisObject()
  384. current_time = int(time.time()) # 获取当前时间戳
  385. for item in qs:
  386. iccid = item['iccid']
  387. key = 'monthly_flow_' + iccid
  388. flow_dict = redis_obj.get_all_hash_data(key) # 获取Redis中指定键的哈希表数据
  389. iot_card_list = [] # 创建一个空的列表,用于批量创建IotCardUsageHistory对象
  390. for k, v in flow_dict.items():
  391. try:
  392. cycle = datetime.datetime.strptime(str(k.decode()), '%Y-%m') # 将字符串日期解析为datetime类型
  393. flow = float(v)
  394. iot_card_list.append(IotCardUsageHistory(
  395. iccid=iccid,
  396. card_type=1,
  397. cycle=int(cycle.strftime('%Y%m')), # 将日期转换为整数形式,如202201
  398. flow_total_usage=flow,
  399. created_time=current_time,
  400. updated_time=current_time
  401. ))
  402. except Exception as e:
  403. print(repr(e))
  404. continue
  405. # 批量创建IotCardUsageHistory对象
  406. if iot_card_list:
  407. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  408. @classmethod
  409. def query_flow_cache(cls, response):
  410. """
  411. 查询流量缓存永久的将设置过期时间为10分钟
  412. """
  413. redis = RedisObject()
  414. try:
  415. res = redis.get_keys('ASJ:UNICOM:FLOW:*')
  416. keys = [key.decode() for key in res]
  417. # 进行进一步的处理或打印
  418. for key in keys:
  419. ttl = redis.get_ttl(key)
  420. if ttl == -1:
  421. logger.info('iccidFlow:{}'.format(key))
  422. redis.CONN.expire(key, 60 * 10)
  423. return response.json(0)
  424. except Exception as e:
  425. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  426. return response.json(177, repr(e))
  427. @staticmethod
  428. def get_device_usage_history(response):
  429. """
  430. 查询联通设备历史用量
  431. """
  432. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid')
  433. if not card_qs.exists():
  434. return response.json(0)
  435. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_usage_history, args=(card_qs,))
  436. asy.start()
  437. return response.json(0)
  438. @staticmethod
  439. def async_update_device_usage_history(qs):
  440. """
  441. 异步更新设备用量历史
  442. @param qs: 联通iccid集合
  443. """
  444. try:
  445. u_service = UnicomObjeect()
  446. iot_card_list = []
  447. now_time = int(time.time())
  448. # 获取当前时间
  449. current_date = datetime.datetime.now()
  450. # 计算上个月的时间
  451. last_month_date = current_date - datetime.timedelta(days=current_date.day)
  452. # 例格式化日期为 "202307"
  453. formatted_date = last_month_date.strftime('%Y%m')
  454. for item in qs:
  455. params = {'iccid': item['iccid']}
  456. result = u_service.query_device_usage_history(**params)
  457. res_dict = u_service.get_text_dict(result)
  458. if res_dict['code'] == 0 and res_dict['data']:
  459. for cycle in res_dict['data']:
  460. if cycle['flowTotalUsage'] <= 0 or cycle['cycle'] != int(formatted_date):
  461. continue
  462. iot_card_list.append(IotCardUsageHistory(
  463. iccid=item['iccid'],
  464. card_type=1,
  465. cycle=cycle['cycle'],
  466. flow_total_usage=cycle['flowTotalUsage'],
  467. created_time=now_time,
  468. updated_time=now_time
  469. ))
  470. if not iot_card_list:
  471. return None
  472. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  473. except Exception as e:
  474. logger.info('查询账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  475. return None
  476. @staticmethod
  477. def update_device_cycle_flow(response):
  478. """
  479. 更新设备账期流量接口
  480. """
  481. card_qs = UnicomDeviceInfo.objects.filter(card_type=0, status=2).values('iccid')
  482. if not card_qs.exists():
  483. return response.json(0)
  484. logger.info('总数:{}'.format(card_qs.count()))
  485. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_cycle_flow, args=(card_qs,))
  486. asy.start()
  487. return response.json(0)
  488. @staticmethod
  489. def async_update_device_cycle_flow(qs):
  490. """
  491. 异步更新设备账期流量
  492. """
  493. try:
  494. logger.info('进入异步更新设备卡账期流量~~~~')
  495. unicom_api = UnicomObjeect()
  496. for item in qs:
  497. try:
  498. unicom_api.get_flow_usage_total(item['iccid'])
  499. except Exception as e:
  500. print(repr(e))
  501. continue
  502. except Exception as e:
  503. logger.info('更新账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  504. return None