UnicomComboTaskController.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : UnicomComboTaskController.py
  4. @Time : 2022/6/30 16:23
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import datetime
  10. import json
  11. import logging
  12. import threading
  13. import time
  14. from decimal import Decimal
  15. from django.db import transaction
  16. from django.db.models import Q
  17. from django.views import View
  18. from Model.models import UnicomComboOrderInfo, UnicomCombo, Order_Model, UnicomDeviceInfo, UnicomFlowPush, \
  19. IotCardUsageHistory, AccessNumberTaskQueue
  20. from Object.RedisObject import RedisObject
  21. from Object.ResponseObject import ResponseObject
  22. from Object.TelecomObject import TelecomObject
  23. from Object.UnicomObject import UnicomObjeect
  24. from Service.TelecomService import TelecomService
  25. logger = logging.getLogger('info')
  26. class UnicomComboTaskView(View):
  27. def get(self, request, *args, **kwargs):
  28. request.encoding = 'utf-8'
  29. operation = kwargs.get('operation')
  30. return self.validation(request.GET, request, operation)
  31. def post(self, request, *args, **kwargs):
  32. request.encoding = 'utf-8'
  33. operation = kwargs.get('operation')
  34. return self.validation(request.POST, request, operation)
  35. def validation(self, request_dict, request, operation):
  36. response = ResponseObject()
  37. print(request)
  38. if operation == 'check-activate':
  39. return self.check_activate_combo(request_dict, response)
  40. elif operation == 'check-flow':
  41. return self.check_flow_usage(response)
  42. elif operation == 'check-flow-expire':
  43. return self.check_flow_expire(response)
  44. elif operation == 'check-expire':
  45. today = datetime.datetime.today()
  46. year = today.year
  47. month = today.month
  48. self.query_unused_combo_and_activate(request_dict.get('iccid'), year, month, '666')
  49. return response.json(0)
  50. elif operation == 'updateFlowUsed': # 更新流量使用
  51. self.unicom_flow_used(request_dict, response)
  52. return response.json(0)
  53. elif operation == 'queryFlowUsedHistory':
  54. return self.query_flow_used_history(response)
  55. elif operation == 'queryFlowCache':
  56. return self.query_flow_cache(response)
  57. elif operation == 'getDeviceUsageHistory':
  58. return self.get_device_usage_history(response)
  59. elif operation == 'updateCardCycleFlow':
  60. return self.update_device_cycle_flow(response)
  61. elif operation == 'executeAccessTask':
  62. return self.get_access_number_change_task(response)
  63. else:
  64. return response.json(414)
  65. @classmethod
  66. def check_activate_combo(cls, request_dict, response):
  67. """
  68. 定时检查是否有次月激活套餐
  69. @param request_dict:
  70. @param response:
  71. @return:
  72. """
  73. print(request_dict)
  74. logger.info('--->进入监控次月激活联通套餐')
  75. now_time = int(time.time())
  76. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(status=0, next_month_activate=True,
  77. activation_time__lte=now_time,
  78. expire_time__gte=now_time, is_del=0).values()
  79. if not combo_order_info_qs.exists():
  80. return response.json(0)
  81. try:
  82. today = datetime.datetime.today()
  83. year = today.year
  84. month = today.month
  85. with transaction.atomic():
  86. unicom_api = UnicomObjeect()
  87. for item in combo_order_info_qs:
  88. if item['order_id']:
  89. order_id = item['order_id']
  90. order_qs = Order_Model.objects.filter(orderID=order_id, status=1)
  91. if not order_qs.exists():
  92. continue
  93. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, iccid=item['iccid'])
  94. # 当前已有套餐正在使用则跳出当前循环
  95. if combo_order_qs.exists():
  96. continue
  97. combo_id = item['combo_id']
  98. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  99. if not combo_qs.exists():
  100. continue
  101. # 查询当月用量情况
  102. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  103. flow_total_usage = Decimal(flow_total_usage).quantize(
  104. Decimal('0.00')) if flow_total_usage > 0 else 0
  105. flow_total_usage = str(flow_total_usage)
  106. iccid = item['iccid']
  107. # 检查激活iccid
  108. unicom_api.change_device_to_activate(iccid)
  109. cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage)
  110. logger.info('激活成功,订单编号:{}'.format(order_id))
  111. return response.json(0)
  112. except Exception as e:
  113. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  114. return response.json(177, repr(e))
  115. @classmethod
  116. def check_flow_usage(cls, response):
  117. """
  118. 检查流量使用情况
  119. @return:
  120. """
  121. logger.info('--->进入监控流量使用情况')
  122. try:
  123. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, is_del=False, combo__is_unlimited=0).values()
  124. if not combo_order_qs.exists():
  125. return response.json(0)
  126. asy = threading.Thread(target=UnicomComboTaskView.async_monitoring_flow, args=(combo_order_qs,))
  127. asy.start()
  128. return response.json(0)
  129. except Exception as e:
  130. logger.info('出错了~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  131. return response.json(177, repr(e))
  132. @classmethod
  133. def async_monitoring_flow(cls, combo_order_qs):
  134. """
  135. 异步检测流量使用详情
  136. """
  137. try:
  138. unicom_api = UnicomObjeect()
  139. today = datetime.datetime.today()
  140. year = today.year
  141. month = today.month
  142. now_time = int(time.time())
  143. for item in combo_order_qs:
  144. iccid = item['iccid']
  145. u_device_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid)
  146. if not u_device_info_qs.exists():
  147. continue
  148. u_device_info_qs = u_device_info_qs.first()
  149. card_type = u_device_info_qs.card_type
  150. activate_usage_flow = float(item['flow_total_usage']) if item['flow_total_usage'] else 0.0
  151. combo_id = item['combo_id']
  152. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  153. if not combo_qs.exists():
  154. continue
  155. combo_qs = combo_qs.first()
  156. flow_total = combo_qs['flow_total']
  157. # 队列已使用总流量总量
  158. flow_total_usage = unicom_api.get_flow_usage_total(iccid, card_type)
  159. flow_total_usage = float(flow_total_usage)
  160. is_expire = False
  161. flow = activate_usage_flow + flow_total # 激活套餐时ICCID历史用量+当前套餐流量
  162. if flow_total_usage > 0:
  163. # 初始套餐已使用流量 + 套餐总流量
  164. if flow_total_usage >= flow:
  165. is_expire = True
  166. usage = (flow_total_usage - activate_usage_flow) if flow_total_usage > activate_usage_flow else 0
  167. cls.flow_warning_push(u_device_info_qs.user_id, u_device_info_qs.serial_no, item['id'], flow_total,
  168. usage)
  169. # 检查是否有当月未使用套餐 没有则停卡
  170. if is_expire:
  171. flow_exceed = flow_total_usage - flow
  172. UnicomComboOrderInfo.objects.filter(id=item['id']) \
  173. .update(status=2, updated_time=now_time, flow_exceed=flow_exceed)
  174. activate_status = cls.query_unused_combo_and_activate(iccid, year, month,
  175. flow_total_usage)
  176. logger.info('-->当前卡{}流量已用完,是否有生效套餐:{}'.format(iccid, activate_status))
  177. if not activate_status: # 停用或断网
  178. if card_type == 3: # 鼎芯电信
  179. TelecomService().update_access_number_network(iccid, u_device_info_qs.access_number, 'ADD',
  180. '套餐流量已用完')
  181. else:
  182. unicom_api.change_device_to_disable(iccid=iccid, reason='套餐流量已用完')
  183. except Exception as e:
  184. logger.info('异步~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  185. @staticmethod
  186. def flow_warning_push(app_user_id, serial_no, combo_order_id, flow_total, flow_usage):
  187. """
  188. 监控流量使用大于85%and小于96%进行消息推送提醒
  189. @param app_user_id: app用户id
  190. @param serial_no: 序列号
  191. @param combo_order_id: 当前套餐订单id
  192. @param flow_total: 套餐流量总量
  193. @param flow_usage: 套餐已使用流量
  194. @return:
  195. """
  196. try:
  197. if not app_user_id:
  198. return False
  199. now_time = int(time.time())
  200. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': serial_no,
  201. 'flow_total_usage': flow_usage, 'flow_total': flow_total, 'status': 0,
  202. 'updated_time': now_time,
  203. 'created_time': now_time, 'user_id': app_user_id}
  204. if 0 < flow_total and 0 < flow_usage < flow_total:
  205. res = flow_usage / flow_total * 100
  206. if 85 < res <= 95:
  207. flow_push = UnicomFlowPush.objects.filter(serial_no=serial_no, combo_order_id=combo_order_id)
  208. if not flow_push.exists():
  209. UnicomFlowPush.objects.create(**push_data)
  210. elif flow_usage >= flow_total:
  211. push_data['flow_total_usage'] = flow_total
  212. push_data['type'] = 1
  213. UnicomFlowPush.objects.create(**push_data)
  214. return True
  215. except Exception as e:
  216. logger.info('出错了~异常流量监控,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  217. @staticmethod
  218. def query_unused_combo_and_activate(iccid, year, month, usage_flow):
  219. """
  220. 查询未使用套餐并激活
  221. @param iccid:
  222. @param year:
  223. @param month:
  224. @param usage_flow:
  225. @return:
  226. """
  227. try:
  228. now_time = int(time.time())
  229. combo_order_qs = UnicomComboOrderInfo.objects \
  230. .filter(expire_time__gt=now_time, activation_time__lte=now_time, status=0, iccid=iccid) \
  231. .order_by('created_time')
  232. if not combo_order_qs.exists():
  233. return False
  234. combo_order = combo_order_qs.first()
  235. if not combo_order.order_id:
  236. return False
  237. order_qs = Order_Model.objects.filter(orderID=combo_order.order_id, status=1)
  238. if not order_qs.exists():
  239. return False
  240. upd_data = {
  241. 'status': 1,
  242. 'year': year,
  243. 'month': month,
  244. 'flow_total_usage': str(usage_flow),
  245. 'activation_time': now_time,
  246. 'updated_time': now_time,
  247. }
  248. UnicomComboOrderInfo.objects.filter(id=combo_order.id).update(**upd_data)
  249. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  250. args=(iccid, combo_order.id, 3))
  251. asy.start()
  252. return True
  253. except Exception as e:
  254. logger.info('出错了~激活套餐,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  255. return False
  256. @classmethod
  257. def check_flow_expire(cls, response):
  258. """
  259. 检查流量到期停卡操作
  260. @param response:
  261. @return:
  262. """
  263. logger.info('check_flow_expire进入监控流量到期停卡或激活叠加包')
  264. now_time = int(time.time())
  265. combo_order_qs = UnicomComboOrderInfo.objects.filter(~Q(status=2), expire_time__lte=now_time,
  266. is_del=False).values()
  267. if not combo_order_qs.exists():
  268. return response.json(0)
  269. asy = threading.Thread(target=UnicomComboTaskView.async_deactivate_expire_package,
  270. args=(combo_order_qs, now_time))
  271. asy.start()
  272. return response.json(0)
  273. @staticmethod
  274. def async_deactivate_expire_package(combo_order_qs, now_time):
  275. """
  276. 异步停用过期套餐
  277. @param combo_order_qs: 过期套餐querySet
  278. @param now_time 当前实际
  279. """
  280. today = datetime.datetime.today()
  281. year = today.year
  282. month = today.month
  283. iccid_list = []
  284. for item in combo_order_qs:
  285. try:
  286. icc_id = item['iccid']
  287. um_device_qs = UnicomDeviceInfo.objects.filter(iccid=icc_id)
  288. if not um_device_qs.exists():
  289. continue
  290. UnicomComboOrderInfo.objects.filter(id=item['id']).update(status=2, updated_time=now_time)
  291. iccid_list.append(icc_id)
  292. logger.info('修改套餐已失效成功,iccid:{}'.format(icc_id))
  293. except Exception as e:
  294. logger.info('async_deactivate_expire_package套餐过期修改失效异常,{}'
  295. 'errLine:{}, errMsg:{}'.format(item['iccid'], e.__traceback__.tb_lineno, repr(e)))
  296. continue
  297. # set无序不重复元素集
  298. iccid_list = list(set(iccid_list))
  299. unicom_api = UnicomObjeect()
  300. for item in iccid_list:
  301. try:
  302. activate_combo_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=1, expire_time__gt=now_time,
  303. is_del=False).values()
  304. if activate_combo_qs.exists():
  305. continue
  306. usage_flow = unicom_api.get_flow_usage_total(item)
  307. # 查询是否有未使用套餐,有则进行激活 否 则调用API停卡
  308. result = UnicomComboTaskView().query_unused_combo_and_activate(item, year, month, usage_flow)
  309. if not result: # 没有可用套餐进行停卡
  310. # 停用设备
  311. unicom_api.change_device_to_disable(iccid=item, reason='没有可用套餐')
  312. logger.info('调用停卡API successful,iccid:{}'.format(item))
  313. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=2) \
  314. .values('id').order_by('-updated_time')
  315. combo_order = combo_order_info_qs.first()
  316. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  317. args=(item, combo_order['id'], 4))
  318. asy.start()
  319. else:
  320. unicom_api.change_device_to_activate(item)
  321. except Exception as e:
  322. logger.info('async_deactivate_expire_package套餐过期停卡异常,{}'
  323. 'errLine:{}, errMsg:{}'.format(item, e.__traceback__.tb_lineno, repr(e)))
  324. continue
  325. @staticmethod
  326. def async_combo_sys_msg_push(iccid, combo_order_id, push_type):
  327. """
  328. 异步保存消息推送 激活|过期
  329. @param iccid:
  330. @param combo_order_id:
  331. @param push_type:
  332. @return:
  333. """
  334. try:
  335. now_time = int(time.time())
  336. ud_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid).values('serial_no', 'user_id')
  337. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': ud_info_qs.first()['serial_no'],
  338. 'status': 0, 'type': push_type,
  339. 'updated_time': now_time,
  340. 'created_time': now_time, 'user_id': ud_info_qs.first()['user_id']}
  341. UnicomFlowPush.objects.create(**push_data)
  342. except Exception as e:
  343. logger.info('-->出错了~,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  344. @staticmethod
  345. def unicom_flow_used(request_dict, response):
  346. """
  347. 查询设备每张卡流量使用情况
  348. @param request_dict:
  349. @param response:
  350. @return:
  351. """
  352. page_size = int(request_dict.get('pageSize', 1))
  353. device_count = UnicomDeviceInfo.objects.filter(card_type=0).count()
  354. total_pages = device_count // page_size + (device_count % page_size > 0) # 计算总页数
  355. for page_number in range(1, total_pages + 1):
  356. u_device_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('id', 'iccid', 'sim_used_flow').order_by(
  357. '-created_time')[(page_number - 1) * page_size:page_number * page_size]
  358. asy = threading.Thread(target=UnicomComboTaskView.thread_collect_flow_used, args=(u_device_qs,))
  359. asy.start()
  360. return response.json(0)
  361. @staticmethod
  362. def thread_collect_flow_used(u_device_qs):
  363. for item in u_device_qs:
  364. try:
  365. unicom_api = UnicomObjeect()
  366. n_time = int(time.time())
  367. # 队列已使用总流量总量
  368. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  369. UnicomDeviceInfo.objects.filter(id=item['id']).update(updated_time=n_time,
  370. sim_used_flow=flow_total_usage)
  371. except Exception as e:
  372. print(repr(e))
  373. continue
  374. @classmethod
  375. def query_flow_used_history(cls, response):
  376. # 获取符合条件的卡片对象查询集,并按创建时间升序排序
  377. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').order_by('created_time')
  378. if not card_qs.exists():
  379. return response.json(0)
  380. asy = threading.Thread(target=UnicomComboTaskView.async_bulk_create_usage_history, args=(card_qs,))
  381. asy.start()
  382. return response.json(0)
  383. @staticmethod
  384. def async_bulk_create_usage_history(qs):
  385. """
  386. 异步批量创建流量用量历史记录
  387. """
  388. redis_obj = RedisObject()
  389. current_time = int(time.time()) # 获取当前时间戳
  390. for item in qs:
  391. iccid = item['iccid']
  392. key = 'monthly_flow_' + iccid
  393. flow_dict = redis_obj.get_all_hash_data(key) # 获取Redis中指定键的哈希表数据
  394. iot_card_list = [] # 创建一个空的列表,用于批量创建IotCardUsageHistory对象
  395. for k, v in flow_dict.items():
  396. try:
  397. cycle = datetime.datetime.strptime(str(k.decode()), '%Y-%m') # 将字符串日期解析为datetime类型
  398. flow = float(v)
  399. iot_card_list.append(IotCardUsageHistory(
  400. iccid=iccid,
  401. card_type=1,
  402. cycle=int(cycle.strftime('%Y%m')), # 将日期转换为整数形式,如202201
  403. flow_total_usage=flow,
  404. created_time=current_time,
  405. updated_time=current_time
  406. ))
  407. except Exception as e:
  408. print(repr(e))
  409. continue
  410. # 批量创建IotCardUsageHistory对象
  411. if iot_card_list:
  412. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  413. @classmethod
  414. def query_flow_cache(cls, response):
  415. """
  416. 查询流量缓存永久的将设置过期时间为10分钟
  417. """
  418. redis = RedisObject()
  419. try:
  420. res = redis.get_keys('ASJ:UNICOM:FLOW:*')
  421. keys = [key.decode() for key in res]
  422. # 进行进一步的处理或打印
  423. for key in keys:
  424. ttl = redis.get_ttl(key)
  425. if ttl == -1:
  426. logger.info('iccidFlow:{}'.format(key))
  427. redis.CONN.expire(key, 60 * 10)
  428. return response.json(0)
  429. except Exception as e:
  430. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  431. return response.json(177, repr(e))
  432. @staticmethod
  433. def get_device_usage_history(response):
  434. """
  435. 查询联通设备历史用量
  436. """
  437. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid')
  438. if not card_qs.exists():
  439. return response.json(0)
  440. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_usage_history, args=(card_qs,))
  441. asy.start()
  442. return response.json(0)
  443. @staticmethod
  444. def async_update_device_usage_history(qs):
  445. """
  446. 异步更新设备用量历史
  447. @param qs: 联通iccid集合
  448. """
  449. try:
  450. u_service = UnicomObjeect()
  451. iot_card_list = []
  452. now_time = int(time.time())
  453. # 获取当前时间
  454. current_date = datetime.datetime.now()
  455. # 计算上个月的时间
  456. last_month_date = current_date - datetime.timedelta(days=current_date.day)
  457. # 例格式化日期为 "202307"
  458. formatted_date = last_month_date.strftime('%Y%m')
  459. for item in qs:
  460. params = {'iccid': item['iccid']}
  461. result = u_service.query_device_usage_history(**params)
  462. res_dict = u_service.get_text_dict(result)
  463. if res_dict['code'] == 0 and res_dict['data']:
  464. for cycle in res_dict['data']:
  465. if cycle['flowTotalUsage'] <= 0 or cycle['cycle'] != int(formatted_date):
  466. continue
  467. iot_card_list.append(IotCardUsageHistory(
  468. iccid=item['iccid'],
  469. card_type=1,
  470. cycle=cycle['cycle'],
  471. flow_total_usage=cycle['flowTotalUsage'],
  472. created_time=now_time,
  473. updated_time=now_time
  474. ))
  475. if not iot_card_list:
  476. return None
  477. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  478. except Exception as e:
  479. logger.info('查询账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  480. return None
  481. @staticmethod
  482. def update_device_cycle_flow(response):
  483. """
  484. 更新设备账期流量接口
  485. """
  486. card_qs = UnicomDeviceInfo.objects.filter(card_type=0, status=2).values('iccid')
  487. if not card_qs.exists():
  488. return response.json(0)
  489. logger.info('总数:{}'.format(card_qs.count()))
  490. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_cycle_flow, args=(card_qs,))
  491. asy.start()
  492. return response.json(0)
  493. @staticmethod
  494. def async_update_device_cycle_flow(qs):
  495. """
  496. 异步更新设备账期流量
  497. """
  498. try:
  499. logger.info('进入异步更新设备卡账期流量~~~~')
  500. unicom_api = UnicomObjeect()
  501. for item in qs:
  502. try:
  503. unicom_api.get_flow_usage_total(item['iccid'])
  504. except Exception as e:
  505. print(repr(e))
  506. continue
  507. except Exception as e:
  508. logger.info('更新账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  509. return None
  510. @staticmethod
  511. def get_access_number_change_task(response):
  512. """
  513. 获取接入号码变更任务
  514. """
  515. task_qs = AccessNumberTaskQueue.objects.exclude(status=1).filter(count__lt=4).order_by('created_time')
  516. if not task_qs.exists():
  517. return response.json(0)
  518. logger.info('接入卡号变更任务总数:{}'.format(task_qs.count()))
  519. asy = threading.Thread(target=UnicomComboTaskView.async_update_access_number_status, args=(task_qs,))
  520. asy.start()
  521. return response.json(0)
  522. @staticmethod
  523. def async_update_access_number_status(qs):
  524. """
  525. 异步更新设备账期流量
  526. """
  527. try:
  528. logger.info('进入异步更新接入卡号网络状态~~~~')
  529. redis = RedisObject()
  530. iccid_list = []
  531. for item in qs:
  532. try:
  533. key = f'ASJ:TELECOM:CHANGE:{item.iccid}'
  534. change_data = redis.get_data(key)
  535. if change_data:
  536. iccid_list.append(item.iccid)
  537. continue
  538. if item.iccid in iccid_list: # 避免同一ICCID多次执行
  539. continue
  540. action_value = 'DEL' if item.action == 2 else 'ADD'
  541. result = TelecomObject().single_cut_net(item.access_number, action_value) # 变更网络状态
  542. now_time = int(time.time())
  543. count = item.count + 1
  544. cache_data = {'iccid': item.iccid, 'actionValue': action_value}
  545. if not result: # 失败修改执行次数
  546. iccid_list.append(item.iccid)
  547. data = {'status': 2, 'count': count, 'updated_time': now_time}
  548. AccessNumberTaskQueue.objects.filter(id=item.id).update(**data)
  549. redis.CONN.setnx(key, json.dumps(cache_data))
  550. redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态
  551. continue
  552. data = {'status': 1, 'count': count, 'completion_time': now_time, 'updated_time': now_time}
  553. if result == '-5': # 已符合变更后状态
  554. data['result'] = {'-5': '该号码未订购单独断网功能!;流水号:1000000190202402201640359500'}
  555. else:
  556. data['result'] = result
  557. iccid_list.append(item.iccid)
  558. AccessNumberTaskQueue.objects.filter(id=item.id).update(**data) # 成功后修改任务记录状态
  559. redis.CONN.setnx(key, json.dumps(cache_data))
  560. redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态
  561. except Exception as e:
  562. print(repr(e))
  563. continue
  564. except Exception as e:
  565. logger.info('异步变更卡网络状态异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))