UnicomComboTaskController.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : UnicomComboTaskController.py
  4. @Time : 2022/6/30 16:23
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import datetime
  10. import json
  11. import logging
  12. import threading
  13. import time
  14. from decimal import Decimal
  15. from django.core.paginator import Paginator
  16. from django.db import transaction
  17. from django.db.models import Q
  18. from django.views import View
  19. from Model.models import UnicomComboOrderInfo, UnicomCombo, Order_Model, UnicomDeviceInfo, UnicomFlowPush, \
  20. IotCardUsageHistory, AccessNumberTaskQueue, IotCardOrderUsageHistory
  21. from Object.RedisObject import RedisObject
  22. from Object.ResponseObject import ResponseObject
  23. from Object.TelecomObject import TelecomObject
  24. from Object.UnicomObject import UnicomObjeect
  25. from Object.utils import LocalDateTimeUtil
  26. from Service.TelecomService import TelecomService
  27. logger = logging.getLogger('info')
  28. class UnicomComboTaskView(View):
  29. def get(self, request, *args, **kwargs):
  30. request.encoding = 'utf-8'
  31. operation = kwargs.get('operation')
  32. return self.validation(request.GET, request, operation)
  33. def post(self, request, *args, **kwargs):
  34. request.encoding = 'utf-8'
  35. operation = kwargs.get('operation')
  36. return self.validation(request.POST, request, operation)
  37. def validation(self, request_dict, request, operation):
  38. response = ResponseObject()
  39. print(request)
  40. if operation == 'check-activate':
  41. return self.check_activate_combo(request_dict, response)
  42. elif operation == 'check-flow':
  43. return self.check_flow_usage(response)
  44. elif operation == 'check-flow-expire':
  45. return self.check_flow_expire(response)
  46. elif operation == 'check-expire':
  47. today = datetime.datetime.today()
  48. year = today.year
  49. month = today.month
  50. self.query_unused_combo_and_activate(request_dict.get('iccid'), year, month, '666')
  51. return response.json(0)
  52. elif operation == 'updateFlowUsed': # 更新流量使用
  53. self.unicom_flow_used(request_dict, response)
  54. return response.json(0)
  55. elif operation == 'queryFlowUsedHistory':
  56. return self.query_flow_used_history(response)
  57. elif operation == 'queryFlowCache':
  58. return self.query_flow_cache(response)
  59. elif operation == 'getDeviceUsageHistory':
  60. return self.get_device_usage_history(response)
  61. elif operation == 'updateCardCycleFlow':
  62. return self.update_device_cycle_flow(response)
  63. elif operation == 'executeAccessTask':
  64. return self.get_access_number_change_task(response)
  65. elif operation == 'queryTotalTrafficToday':
  66. return self.query_total_traffic_today(response)
  67. else:
  68. return response.json(414)
  69. @classmethod
  70. def check_activate_combo(cls, request_dict, response):
  71. """
  72. 定时检查是否有次月激活套餐
  73. @param request_dict:
  74. @param response:
  75. @return:
  76. """
  77. print(request_dict)
  78. logger.info('--->进入监控次月激活联通套餐')
  79. now_time = int(time.time())
  80. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(status=0, next_month_activate=True,
  81. activation_time__lte=now_time,
  82. expire_time__gte=now_time, is_del=0).values()
  83. if not combo_order_info_qs.exists():
  84. return response.json(0)
  85. try:
  86. today = datetime.datetime.today()
  87. year = today.year
  88. month = today.month
  89. with transaction.atomic():
  90. unicom_api = UnicomObjeect()
  91. for item in combo_order_info_qs:
  92. if item['order_id']:
  93. order_id = item['order_id']
  94. order_qs = Order_Model.objects.filter(orderID=order_id, status=1)
  95. if not order_qs.exists():
  96. continue
  97. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, iccid=item['iccid'])
  98. # 当前已有套餐正在使用则跳出当前循环
  99. if combo_order_qs.exists():
  100. continue
  101. combo_id = item['combo_id']
  102. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  103. if not combo_qs.exists():
  104. continue
  105. # 查询当月用量情况
  106. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  107. flow_total_usage = Decimal(flow_total_usage).quantize(
  108. Decimal('0.00')) if flow_total_usage > 0 else 0
  109. flow_total_usage = str(flow_total_usage)
  110. iccid = item['iccid']
  111. # 检查激活iccid
  112. unicom_api.change_device_to_activate(iccid)
  113. cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage)
  114. logger.info('激活成功,订单编号:{}'.format(order_id))
  115. return response.json(0)
  116. except Exception as e:
  117. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  118. return response.json(177, repr(e))
  119. @classmethod
  120. def check_flow_usage(cls, response):
  121. """
  122. 检查流量使用情况
  123. @return:
  124. """
  125. logger.info('--->进入监控流量使用情况')
  126. try:
  127. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, is_del=False, combo__is_unlimited=0).values()
  128. if not combo_order_qs.exists():
  129. return response.json(0)
  130. asy = threading.Thread(target=UnicomComboTaskView.async_monitoring_flow, args=(combo_order_qs,))
  131. asy.start()
  132. return response.json(0)
  133. except Exception as e:
  134. logger.info('出错了~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  135. return response.json(177, repr(e))
  136. @classmethod
  137. def async_monitoring_flow(cls, combo_order_qs):
  138. """
  139. 异步检测流量使用详情
  140. """
  141. try:
  142. unicom_api = UnicomObjeect()
  143. today = datetime.datetime.today()
  144. year = today.year
  145. month = today.month
  146. now_time = int(time.time())
  147. for item in combo_order_qs:
  148. iccid = item['iccid']
  149. u_device_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid)
  150. if not u_device_info_qs.exists():
  151. continue
  152. u_device_info_qs = u_device_info_qs.first()
  153. card_type = u_device_info_qs.card_type
  154. activate_usage_flow = float(item['flow_total_usage']) if item['flow_total_usage'] else 0.0
  155. combo_id = item['combo_id']
  156. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  157. if not combo_qs.exists():
  158. continue
  159. combo_qs = combo_qs.first()
  160. flow_total = combo_qs['flow_total']
  161. # 队列已使用总流量总量
  162. flow_total_usage = unicom_api.get_flow_usage_total(iccid, card_type)
  163. flow_total_usage = float(flow_total_usage)
  164. is_expire = False
  165. flow = activate_usage_flow + flow_total # 激活套餐时ICCID历史用量+当前套餐流量
  166. if flow_total_usage > 0:
  167. # 初始套餐已使用流量 + 套餐总流量
  168. if flow_total_usage >= flow:
  169. is_expire = True
  170. usage = (flow_total_usage - activate_usage_flow) if flow_total_usage > activate_usage_flow else 0
  171. cls.flow_warning_push(u_device_info_qs.user_id, u_device_info_qs.serial_no, item['id'], flow_total,
  172. usage)
  173. # 检查是否有当月未使用套餐 没有则停卡
  174. if is_expire:
  175. flow_exceed = flow_total_usage - flow
  176. UnicomComboOrderInfo.objects.filter(id=item['id']) \
  177. .update(status=2, updated_time=now_time, flow_exceed=flow_exceed)
  178. activate_status = cls.query_unused_combo_and_activate(iccid, year, month,
  179. flow_total_usage)
  180. logger.info('-->当前卡{}流量已用完,是否有生效套餐:{}'.format(iccid, activate_status))
  181. if not activate_status: # 停用或断网
  182. if card_type == 3: # 鼎芯电信
  183. TelecomService().update_access_number_network(iccid, u_device_info_qs.access_number, 'ADD',
  184. '套餐流量已用完')
  185. else:
  186. unicom_api.change_device_to_disable(iccid=iccid, reason='套餐流量已用完')
  187. except Exception as e:
  188. logger.info('异步~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  189. @staticmethod
  190. def flow_warning_push(app_user_id, serial_no, combo_order_id, flow_total, flow_usage):
  191. """
  192. 监控流量使用大于85%and小于96%进行消息推送提醒
  193. @param app_user_id: app用户id
  194. @param serial_no: 序列号
  195. @param combo_order_id: 当前套餐订单id
  196. @param flow_total: 套餐流量总量
  197. @param flow_usage: 套餐已使用流量
  198. @return:
  199. """
  200. try:
  201. if not app_user_id:
  202. return False
  203. now_time = int(time.time())
  204. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': serial_no,
  205. 'flow_total_usage': flow_usage, 'flow_total': flow_total, 'status': 0,
  206. 'updated_time': now_time,
  207. 'created_time': now_time, 'user_id': app_user_id}
  208. if 0 < flow_total and 0 < flow_usage < flow_total:
  209. res = flow_usage / flow_total * 100
  210. if 85 < res <= 95:
  211. flow_push = UnicomFlowPush.objects.filter(serial_no=serial_no, combo_order_id=combo_order_id)
  212. if not flow_push.exists():
  213. UnicomFlowPush.objects.create(**push_data)
  214. elif flow_usage >= flow_total:
  215. push_data['flow_total_usage'] = flow_total
  216. push_data['type'] = 1
  217. UnicomFlowPush.objects.create(**push_data)
  218. return True
  219. except Exception as e:
  220. logger.info('出错了~异常流量监控,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  221. @staticmethod
  222. def query_unused_combo_and_activate(iccid, year, month, usage_flow):
  223. """
  224. 查询未使用套餐并激活
  225. @param iccid:
  226. @param year:
  227. @param month:
  228. @param usage_flow:
  229. @return:
  230. """
  231. try:
  232. now_time = int(time.time())
  233. combo_order_qs = UnicomComboOrderInfo.objects \
  234. .filter(expire_time__gt=now_time, activation_time__lte=now_time, status=0, iccid=iccid) \
  235. .order_by('created_time')
  236. if not combo_order_qs.exists():
  237. return False
  238. combo_order = combo_order_qs.first()
  239. if not combo_order.order_id:
  240. return False
  241. order_qs = Order_Model.objects.filter(orderID=combo_order.order_id, status=1)
  242. if not order_qs.exists():
  243. return False
  244. upd_data = {
  245. 'status': 1,
  246. 'year': year,
  247. 'month': month,
  248. 'flow_total_usage': str(usage_flow),
  249. 'activation_time': now_time,
  250. 'updated_time': now_time,
  251. }
  252. UnicomComboOrderInfo.objects.filter(id=combo_order.id).update(**upd_data)
  253. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  254. args=(iccid, combo_order.id, 3))
  255. asy.start()
  256. return True
  257. except Exception as e:
  258. logger.info('出错了~激活套餐,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  259. return False
  260. @classmethod
  261. def check_flow_expire(cls, response):
  262. """
  263. 检查流量到期停卡操作
  264. @param response:
  265. @return:
  266. """
  267. logger.info('check_flow_expire进入监控流量到期停卡或激活叠加包')
  268. now_time = int(time.time())
  269. combo_order_qs = UnicomComboOrderInfo.objects.filter(~Q(status=2), expire_time__lte=now_time,
  270. is_del=False).values()
  271. if not combo_order_qs.exists():
  272. return response.json(0)
  273. asy = threading.Thread(target=UnicomComboTaskView.async_deactivate_expire_package,
  274. args=(combo_order_qs, now_time))
  275. asy.start()
  276. return response.json(0)
  277. @staticmethod
  278. def async_deactivate_expire_package(combo_order_qs, now_time):
  279. """
  280. 异步停用过期套餐
  281. @param combo_order_qs: 过期套餐querySet
  282. @param now_time 当前实际
  283. """
  284. today = datetime.datetime.today()
  285. year = today.year
  286. month = today.month
  287. iccid_list = []
  288. for item in combo_order_qs:
  289. try:
  290. icc_id = item['iccid']
  291. um_device_qs = UnicomDeviceInfo.objects.filter(iccid=icc_id)
  292. if not um_device_qs.exists():
  293. continue
  294. UnicomComboOrderInfo.objects.filter(id=item['id']).update(status=2, updated_time=now_time)
  295. iccid_list.append(icc_id)
  296. logger.info('修改套餐已失效成功,iccid:{}'.format(icc_id))
  297. except Exception as e:
  298. logger.info('async_deactivate_expire_package套餐过期修改失效异常,{}'
  299. 'errLine:{}, errMsg:{}'.format(item['iccid'], e.__traceback__.tb_lineno, repr(e)))
  300. continue
  301. # set无序不重复元素集
  302. iccid_list = list(set(iccid_list))
  303. unicom_api = UnicomObjeect()
  304. for item in iccid_list:
  305. try:
  306. activate_combo_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=1, expire_time__gt=now_time,
  307. is_del=False).values()
  308. if activate_combo_qs.exists():
  309. continue
  310. usage_flow = unicom_api.get_flow_usage_total(item)
  311. # 查询是否有未使用套餐,有则进行激活 否 则调用API停卡
  312. result = UnicomComboTaskView().query_unused_combo_and_activate(item, year, month, usage_flow)
  313. if not result: # 没有可用套餐进行停卡
  314. # 停用设备
  315. unicom_api.change_device_to_disable(iccid=item, reason='没有可用套餐')
  316. logger.info('调用停卡API successful,iccid:{}'.format(item))
  317. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=2) \
  318. .values('id').order_by('-updated_time')
  319. combo_order = combo_order_info_qs.first()
  320. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  321. args=(item, combo_order['id'], 4))
  322. asy.start()
  323. else:
  324. unicom_api.change_device_to_activate(item)
  325. except Exception as e:
  326. logger.info('async_deactivate_expire_package套餐过期停卡异常,{}'
  327. 'errLine:{}, errMsg:{}'.format(item, e.__traceback__.tb_lineno, repr(e)))
  328. continue
  329. @staticmethod
  330. def async_combo_sys_msg_push(iccid, combo_order_id, push_type):
  331. """
  332. 异步保存消息推送 激活|过期
  333. @param iccid:
  334. @param combo_order_id:
  335. @param push_type:
  336. @return:
  337. """
  338. try:
  339. now_time = int(time.time())
  340. ud_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid).values('serial_no', 'user_id')
  341. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': ud_info_qs.first()['serial_no'],
  342. 'status': 0, 'type': push_type,
  343. 'updated_time': now_time,
  344. 'created_time': now_time, 'user_id': ud_info_qs.first()['user_id']}
  345. UnicomFlowPush.objects.create(**push_data)
  346. except Exception as e:
  347. logger.info('-->出错了~,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  348. @staticmethod
  349. def unicom_flow_used(request_dict, response):
  350. """
  351. 查询设备每张卡流量使用情况
  352. @param request_dict:
  353. @param response:
  354. @return:
  355. """
  356. page_size = int(request_dict.get('pageSize', 1))
  357. device_count = UnicomDeviceInfo.objects.filter(card_type=0).count()
  358. total_pages = device_count // page_size + (device_count % page_size > 0) # 计算总页数
  359. for page_number in range(1, total_pages + 1):
  360. u_device_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('id', 'iccid', 'sim_used_flow').order_by(
  361. '-created_time')[(page_number - 1) * page_size:page_number * page_size]
  362. asy = threading.Thread(target=UnicomComboTaskView.thread_collect_flow_used, args=(u_device_qs,))
  363. asy.start()
  364. return response.json(0)
  365. @staticmethod
  366. def thread_collect_flow_used(u_device_qs):
  367. for item in u_device_qs:
  368. try:
  369. unicom_api = UnicomObjeect()
  370. n_time = int(time.time())
  371. # 队列已使用总流量总量
  372. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  373. UnicomDeviceInfo.objects.filter(id=item['id']).update(updated_time=n_time,
  374. sim_used_flow=flow_total_usage)
  375. except Exception as e:
  376. print(repr(e))
  377. continue
  378. @classmethod
  379. def query_flow_used_history(cls, response):
  380. # 获取符合条件的卡片对象查询集,并按创建时间升序排序
  381. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').order_by('created_time')
  382. if not card_qs.exists():
  383. return response.json(0)
  384. asy = threading.Thread(target=UnicomComboTaskView.async_bulk_create_usage_history, args=(card_qs,))
  385. asy.start()
  386. return response.json(0)
  387. @staticmethod
  388. def async_bulk_create_usage_history(qs):
  389. """
  390. 异步批量创建流量用量历史记录
  391. """
  392. redis_obj = RedisObject()
  393. current_time = int(time.time()) # 获取当前时间戳
  394. for item in qs:
  395. iccid = item['iccid']
  396. key = 'monthly_flow_' + iccid
  397. flow_dict = redis_obj.get_all_hash_data(key) # 获取Redis中指定键的哈希表数据
  398. iot_card_list = [] # 创建一个空的列表,用于批量创建IotCardUsageHistory对象
  399. for k, v in flow_dict.items():
  400. try:
  401. cycle = datetime.datetime.strptime(str(k.decode()), '%Y-%m') # 将字符串日期解析为datetime类型
  402. flow = float(v)
  403. iot_card_list.append(IotCardUsageHistory(
  404. iccid=iccid,
  405. card_type=1,
  406. cycle=int(cycle.strftime('%Y%m')), # 将日期转换为整数形式,如202201
  407. flow_total_usage=flow,
  408. created_time=current_time,
  409. updated_time=current_time
  410. ))
  411. except Exception as e:
  412. print(repr(e))
  413. continue
  414. # 批量创建IotCardUsageHistory对象
  415. if iot_card_list:
  416. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  417. @classmethod
  418. def query_flow_cache(cls, response):
  419. """
  420. 查询流量缓存永久的将设置过期时间为10分钟
  421. """
  422. redis = RedisObject()
  423. try:
  424. res = redis.get_keys('ASJ:UNICOM:FLOW:*')
  425. keys = [key.decode() for key in res]
  426. # 进行进一步的处理或打印
  427. for key in keys:
  428. ttl = redis.get_ttl(key)
  429. if ttl == -1:
  430. logger.info('iccidFlow:{}'.format(key))
  431. redis.CONN.expire(key, 60 * 10)
  432. return response.json(0)
  433. except Exception as e:
  434. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  435. return response.json(177, repr(e))
  436. @staticmethod
  437. def get_device_usage_history(response):
  438. """
  439. 查询联通设备历史用量
  440. """
  441. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid')
  442. if not card_qs.exists():
  443. return response.json(0)
  444. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_usage_history, args=(card_qs,))
  445. asy.start()
  446. return response.json(0)
  447. @staticmethod
  448. def async_update_device_usage_history(qs):
  449. """
  450. 异步更新设备用量历史
  451. @param qs: 联通iccid集合
  452. """
  453. try:
  454. u_service = UnicomObjeect()
  455. iot_card_list = []
  456. now_time = int(time.time())
  457. # 获取当前时间
  458. current_date = datetime.datetime.now()
  459. # 计算上个月的时间
  460. last_month_date = current_date - datetime.timedelta(days=current_date.day)
  461. # 例格式化日期为 "202307"
  462. formatted_date = last_month_date.strftime('%Y%m')
  463. for item in qs:
  464. params = {'iccid': item['iccid']}
  465. result = u_service.query_device_usage_history(**params)
  466. res_dict = u_service.get_text_dict(result)
  467. if res_dict['code'] == 0 and res_dict['data']:
  468. for cycle in res_dict['data']:
  469. if cycle['flowTotalUsage'] <= 0 or cycle['cycle'] != int(formatted_date):
  470. continue
  471. iot_card_list.append(IotCardUsageHistory(
  472. iccid=item['iccid'],
  473. card_type=1,
  474. cycle=cycle['cycle'],
  475. flow_total_usage=cycle['flowTotalUsage'],
  476. created_time=now_time,
  477. updated_time=now_time
  478. ))
  479. if not iot_card_list:
  480. return None
  481. # 使用Django的Paginator进行分页
  482. paginator = Paginator(iot_card_list, 300)
  483. for page_num in range(1, paginator.num_pages + 1):
  484. IotCardUsageHistory.objects.bulk_create(paginator.page(page_num).object_list)
  485. except Exception as e:
  486. logger.info('查询账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  487. return None
  488. @staticmethod
  489. def update_device_cycle_flow(response):
  490. """
  491. 更新设备账期流量接口
  492. """
  493. card_qs = UnicomDeviceInfo.objects.filter(card_type=0, status=2).values('iccid')
  494. if not card_qs.exists():
  495. return response.json(0)
  496. logger.info('总数:{}'.format(card_qs.count()))
  497. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_cycle_flow, args=(card_qs,))
  498. asy.start()
  499. return response.json(0)
  500. @staticmethod
  501. def async_update_device_cycle_flow(qs):
  502. """
  503. 异步更新设备账期流量
  504. """
  505. try:
  506. logger.info('进入异步更新设备卡账期流量~~~~')
  507. unicom_api = UnicomObjeect()
  508. for item in qs:
  509. try:
  510. unicom_api.get_flow_usage_total(item['iccid'])
  511. except Exception as e:
  512. print(repr(e))
  513. continue
  514. except Exception as e:
  515. logger.info('更新账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  516. return None
  517. @staticmethod
  518. def get_access_number_change_task(response):
  519. """
  520. 获取接入号码变更任务
  521. """
  522. task_qs = AccessNumberTaskQueue.objects.exclude(status=1).filter(count__lt=4).order_by('created_time')
  523. if not task_qs.exists():
  524. return response.json(0)
  525. logger.info('接入卡号变更任务总数:{}'.format(task_qs.count()))
  526. asy = threading.Thread(target=UnicomComboTaskView.async_update_access_number_status, args=(task_qs,))
  527. asy.start()
  528. return response.json(0)
  529. @staticmethod
  530. def async_update_access_number_status(qs):
  531. """
  532. 异步更新设备账期流量
  533. """
  534. try:
  535. logger.info('进入异步更新接入卡号网络状态~~~~')
  536. redis = RedisObject()
  537. iccid_list = []
  538. for item in qs:
  539. try:
  540. key = f'ASJ:TELECOM:CHANGE:{item.iccid}'
  541. change_data = redis.get_data(key)
  542. if change_data:
  543. iccid_list.append(item.iccid)
  544. continue
  545. if item.iccid in iccid_list: # 避免同一ICCID多次执行
  546. continue
  547. action_value = 'DEL' if item.action == 2 else 'ADD'
  548. result = TelecomObject().single_cut_net(item.access_number, action_value) # 变更网络状态
  549. now_time = int(time.time())
  550. count = item.count + 1
  551. cache_data = {'iccid': item.iccid, 'actionValue': action_value}
  552. if not result: # 失败修改执行次数
  553. iccid_list.append(item.iccid)
  554. data = {'status': 2, 'count': count, 'updated_time': now_time}
  555. AccessNumberTaskQueue.objects.filter(id=item.id).update(**data)
  556. redis.CONN.setnx(key, json.dumps(cache_data))
  557. redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态
  558. continue
  559. data = {'status': 1, 'count': count, 'completion_time': now_time, 'updated_time': now_time}
  560. if result == '-5': # 已符合变更后状态
  561. data['result'] = {'-5': '该号码未订购单独断网功能!;流水号:1000000190202402201640359500'}
  562. else:
  563. data['result'] = result
  564. iccid_list.append(item.iccid)
  565. AccessNumberTaskQueue.objects.filter(id=item.id).update(**data) # 成功后修改任务记录状态
  566. redis.CONN.setnx(key, json.dumps(cache_data))
  567. redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态
  568. except Exception as e:
  569. print(repr(e))
  570. continue
  571. except Exception as e:
  572. logger.info('异步变更卡网络状态异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  573. @staticmethod
  574. def query_total_traffic_today(response):
  575. """
  576. 4G订单查询今日总流量消耗
  577. @return:
  578. """
  579. asy = threading.Thread(target=UnicomComboTaskView.async_save_today_use_float)
  580. asy.start()
  581. logger.info('查询4G订单总消耗流量记录')
  582. return response.json(0)
  583. @staticmethod
  584. def async_save_today_use_float():
  585. # 查询正在使用套餐并且大于等于2024年激活 不等于免费测试流量的订单数据
  586. today = datetime.datetime.today()
  587. n_data = today.strftime('%Y%m%d')
  588. start_time, end_time = LocalDateTimeUtil.get_start_and_end_time(today.strftime('%Y-%m-%d'), '%Y-%m-%d')
  589. o_combo_qs = UnicomComboOrderInfo.objects \
  590. .filter(status=1, year__gte=2024, created_time__gte=1714492800, created_time__lt=start_time) \
  591. .exclude(combo__combo_type=4) \
  592. .values('order_id', 'iccid', 'flow_total_usage')
  593. try:
  594. unicom_api = UnicomObjeect()
  595. if not o_combo_qs.exists():
  596. return
  597. n_time = int(time.time())
  598. day = n_data[-2:]
  599. iccid_list = []
  600. for item in o_combo_qs:
  601. iccid = item['iccid']
  602. try:
  603. flow = float(unicom_api.get_flow_usage_total(iccid))
  604. if flow == 0:
  605. continue
  606. old_flow = float(item['flow_total_usage'])
  607. if flow == old_flow:
  608. continue
  609. total_usage = Decimal(flow - old_flow).quantize(Decimal('0.00'))
  610. iccid_list.append(IotCardOrderUsageHistory(
  611. iccid=iccid,
  612. order_id=item['order_id'],
  613. card_type=1,
  614. cycle=int(n_data), # 将日期转换为整数形式,如202201
  615. cycle_date=int(day),
  616. total_traffic=total_usage,
  617. created_time=n_time,
  618. updated_time=n_time
  619. ))
  620. if len(iccid_list) >= 300:
  621. IotCardOrderUsageHistory.objects.bulk_create(iccid_list)
  622. iccid_list = []
  623. except Exception as e:
  624. logger.error(
  625. '查询日用量异常iccid{},errLine:{}, errMsg:{}'.format(iccid, e.__traceback__.tb_lineno, repr(e)))
  626. if iccid_list:
  627. IotCardOrderUsageHistory.objects.bulk_create(iccid_list)
  628. except Exception as e:
  629. logger.error('统计4G卡日用量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))