UnicomComboTaskController.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : UnicomComboTaskController.py
  4. @Time : 2022/6/30 16:23
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import datetime
  10. import json
  11. import logging
  12. import threading
  13. import time
  14. from decimal import Decimal
  15. from django.core.paginator import Paginator
  16. from django.db import transaction
  17. from django.db.models import Q
  18. from django.views import View
  19. from Model.models import UnicomComboOrderInfo, UnicomCombo, Order_Model, UnicomDeviceInfo, UnicomFlowPush, \
  20. IotCardUsageHistory, AccessNumberTaskQueue
  21. from Object.RedisObject import RedisObject
  22. from Object.ResponseObject import ResponseObject
  23. from Object.TelecomObject import TelecomObject
  24. from Object.UnicomObject import UnicomObjeect
  25. from Service.TelecomService import TelecomService
  26. logger = logging.getLogger('info')
  27. class UnicomComboTaskView(View):
  28. def get(self, request, *args, **kwargs):
  29. request.encoding = 'utf-8'
  30. operation = kwargs.get('operation')
  31. return self.validation(request.GET, request, operation)
  32. def post(self, request, *args, **kwargs):
  33. request.encoding = 'utf-8'
  34. operation = kwargs.get('operation')
  35. return self.validation(request.POST, request, operation)
  36. def validation(self, request_dict, request, operation):
  37. response = ResponseObject()
  38. print(request)
  39. if operation == 'check-activate':
  40. return self.check_activate_combo(request_dict, response)
  41. elif operation == 'check-flow':
  42. return self.check_flow_usage(response)
  43. elif operation == 'check-flow-expire':
  44. return self.check_flow_expire(response)
  45. elif operation == 'check-expire':
  46. today = datetime.datetime.today()
  47. year = today.year
  48. month = today.month
  49. self.query_unused_combo_and_activate(request_dict.get('iccid'), year, month, '666')
  50. return response.json(0)
  51. elif operation == 'updateFlowUsed': # 更新流量使用
  52. self.unicom_flow_used(request_dict, response)
  53. return response.json(0)
  54. elif operation == 'queryFlowUsedHistory':
  55. return self.query_flow_used_history(response)
  56. elif operation == 'queryFlowCache':
  57. return self.query_flow_cache(response)
  58. elif operation == 'getDeviceUsageHistory':
  59. return self.get_device_usage_history(response)
  60. elif operation == 'updateCardCycleFlow':
  61. return self.update_device_cycle_flow(response)
  62. elif operation == 'executeAccessTask':
  63. return self.get_access_number_change_task(response)
  64. else:
  65. return response.json(414)
  66. @classmethod
  67. def check_activate_combo(cls, request_dict, response):
  68. """
  69. 定时检查是否有次月激活套餐
  70. @param request_dict:
  71. @param response:
  72. @return:
  73. """
  74. print(request_dict)
  75. logger.info('--->进入监控次月激活联通套餐')
  76. now_time = int(time.time())
  77. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(status=0, next_month_activate=True,
  78. activation_time__lte=now_time,
  79. expire_time__gte=now_time, is_del=0).values()
  80. if not combo_order_info_qs.exists():
  81. return response.json(0)
  82. try:
  83. today = datetime.datetime.today()
  84. year = today.year
  85. month = today.month
  86. with transaction.atomic():
  87. unicom_api = UnicomObjeect()
  88. for item in combo_order_info_qs:
  89. if item['order_id']:
  90. order_id = item['order_id']
  91. order_qs = Order_Model.objects.filter(orderID=order_id, status=1)
  92. if not order_qs.exists():
  93. continue
  94. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, iccid=item['iccid'])
  95. # 当前已有套餐正在使用则跳出当前循环
  96. if combo_order_qs.exists():
  97. continue
  98. combo_id = item['combo_id']
  99. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  100. if not combo_qs.exists():
  101. continue
  102. # 查询当月用量情况
  103. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  104. flow_total_usage = Decimal(flow_total_usage).quantize(
  105. Decimal('0.00')) if flow_total_usage > 0 else 0
  106. flow_total_usage = str(flow_total_usage)
  107. iccid = item['iccid']
  108. # 检查激活iccid
  109. unicom_api.change_device_to_activate(iccid)
  110. cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage)
  111. logger.info('激活成功,订单编号:{}'.format(order_id))
  112. return response.json(0)
  113. except Exception as e:
  114. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  115. return response.json(177, repr(e))
  116. @classmethod
  117. def check_flow_usage(cls, response):
  118. """
  119. 检查流量使用情况
  120. @return:
  121. """
  122. logger.info('--->进入监控流量使用情况')
  123. try:
  124. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, is_del=False, combo__is_unlimited=0).values()
  125. if not combo_order_qs.exists():
  126. return response.json(0)
  127. asy = threading.Thread(target=UnicomComboTaskView.async_monitoring_flow, args=(combo_order_qs,))
  128. asy.start()
  129. return response.json(0)
  130. except Exception as e:
  131. logger.info('出错了~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  132. return response.json(177, repr(e))
  133. @classmethod
  134. def async_monitoring_flow(cls, combo_order_qs):
  135. """
  136. 异步检测流量使用详情
  137. """
  138. try:
  139. unicom_api = UnicomObjeect()
  140. today = datetime.datetime.today()
  141. year = today.year
  142. month = today.month
  143. now_time = int(time.time())
  144. for item in combo_order_qs:
  145. iccid = item['iccid']
  146. u_device_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid)
  147. if not u_device_info_qs.exists():
  148. continue
  149. u_device_info_qs = u_device_info_qs.first()
  150. card_type = u_device_info_qs.card_type
  151. activate_usage_flow = float(item['flow_total_usage']) if item['flow_total_usage'] else 0.0
  152. combo_id = item['combo_id']
  153. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  154. if not combo_qs.exists():
  155. continue
  156. combo_qs = combo_qs.first()
  157. flow_total = combo_qs['flow_total']
  158. # 队列已使用总流量总量
  159. flow_total_usage = unicom_api.get_flow_usage_total(iccid, card_type)
  160. flow_total_usage = float(flow_total_usage)
  161. is_expire = False
  162. flow = activate_usage_flow + flow_total # 激活套餐时ICCID历史用量+当前套餐流量
  163. if flow_total_usage > 0:
  164. # 初始套餐已使用流量 + 套餐总流量
  165. if flow_total_usage >= flow:
  166. is_expire = True
  167. usage = (flow_total_usage - activate_usage_flow) if flow_total_usage > activate_usage_flow else 0
  168. cls.flow_warning_push(u_device_info_qs.user_id, u_device_info_qs.serial_no, item['id'], flow_total,
  169. usage)
  170. # 检查是否有当月未使用套餐 没有则停卡
  171. if is_expire:
  172. flow_exceed = flow_total_usage - flow
  173. UnicomComboOrderInfo.objects.filter(id=item['id']) \
  174. .update(status=2, updated_time=now_time, flow_exceed=flow_exceed)
  175. activate_status = cls.query_unused_combo_and_activate(iccid, year, month,
  176. flow_total_usage)
  177. logger.info('-->当前卡{}流量已用完,是否有生效套餐:{}'.format(iccid, activate_status))
  178. if not activate_status: # 停用或断网
  179. if card_type == 3: # 鼎芯电信
  180. TelecomService().update_access_number_network(iccid, u_device_info_qs.access_number, 'ADD',
  181. '套餐流量已用完')
  182. else:
  183. unicom_api.change_device_to_disable(iccid=iccid, reason='套餐流量已用完')
  184. except Exception as e:
  185. logger.info('异步~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  186. @staticmethod
  187. def flow_warning_push(app_user_id, serial_no, combo_order_id, flow_total, flow_usage):
  188. """
  189. 监控流量使用大于85%and小于96%进行消息推送提醒
  190. @param app_user_id: app用户id
  191. @param serial_no: 序列号
  192. @param combo_order_id: 当前套餐订单id
  193. @param flow_total: 套餐流量总量
  194. @param flow_usage: 套餐已使用流量
  195. @return:
  196. """
  197. try:
  198. if not app_user_id:
  199. return False
  200. now_time = int(time.time())
  201. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': serial_no,
  202. 'flow_total_usage': flow_usage, 'flow_total': flow_total, 'status': 0,
  203. 'updated_time': now_time,
  204. 'created_time': now_time, 'user_id': app_user_id}
  205. if 0 < flow_total and 0 < flow_usage < flow_total:
  206. res = flow_usage / flow_total * 100
  207. if 85 < res <= 95:
  208. flow_push = UnicomFlowPush.objects.filter(serial_no=serial_no, combo_order_id=combo_order_id)
  209. if not flow_push.exists():
  210. UnicomFlowPush.objects.create(**push_data)
  211. elif flow_usage >= flow_total:
  212. push_data['flow_total_usage'] = flow_total
  213. push_data['type'] = 1
  214. UnicomFlowPush.objects.create(**push_data)
  215. return True
  216. except Exception as e:
  217. logger.info('出错了~异常流量监控,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  218. @staticmethod
  219. def query_unused_combo_and_activate(iccid, year, month, usage_flow):
  220. """
  221. 查询未使用套餐并激活
  222. @param iccid:
  223. @param year:
  224. @param month:
  225. @param usage_flow:
  226. @return:
  227. """
  228. try:
  229. now_time = int(time.time())
  230. combo_order_qs = UnicomComboOrderInfo.objects \
  231. .filter(expire_time__gt=now_time, activation_time__lte=now_time, status=0, iccid=iccid) \
  232. .order_by('created_time')
  233. if not combo_order_qs.exists():
  234. return False
  235. combo_order = combo_order_qs.first()
  236. if not combo_order.order_id:
  237. return False
  238. order_qs = Order_Model.objects.filter(orderID=combo_order.order_id, status=1)
  239. if not order_qs.exists():
  240. return False
  241. upd_data = {
  242. 'status': 1,
  243. 'year': year,
  244. 'month': month,
  245. 'flow_total_usage': str(usage_flow),
  246. 'activation_time': now_time,
  247. 'updated_time': now_time,
  248. }
  249. UnicomComboOrderInfo.objects.filter(id=combo_order.id).update(**upd_data)
  250. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  251. args=(iccid, combo_order.id, 3))
  252. asy.start()
  253. return True
  254. except Exception as e:
  255. logger.info('出错了~激活套餐,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  256. return False
  257. @classmethod
  258. def check_flow_expire(cls, response):
  259. """
  260. 检查流量到期停卡操作
  261. @param response:
  262. @return:
  263. """
  264. logger.info('check_flow_expire进入监控流量到期停卡或激活叠加包')
  265. now_time = int(time.time())
  266. combo_order_qs = UnicomComboOrderInfo.objects.filter(~Q(status=2), expire_time__lte=now_time,
  267. is_del=False).values()
  268. if not combo_order_qs.exists():
  269. return response.json(0)
  270. asy = threading.Thread(target=UnicomComboTaskView.async_deactivate_expire_package,
  271. args=(combo_order_qs, now_time))
  272. asy.start()
  273. return response.json(0)
  274. @staticmethod
  275. def async_deactivate_expire_package(combo_order_qs, now_time):
  276. """
  277. 异步停用过期套餐
  278. @param combo_order_qs: 过期套餐querySet
  279. @param now_time 当前实际
  280. """
  281. today = datetime.datetime.today()
  282. year = today.year
  283. month = today.month
  284. iccid_list = []
  285. for item in combo_order_qs:
  286. try:
  287. icc_id = item['iccid']
  288. um_device_qs = UnicomDeviceInfo.objects.filter(iccid=icc_id)
  289. if not um_device_qs.exists():
  290. continue
  291. UnicomComboOrderInfo.objects.filter(id=item['id']).update(status=2, updated_time=now_time)
  292. iccid_list.append(icc_id)
  293. logger.info('修改套餐已失效成功,iccid:{}'.format(icc_id))
  294. except Exception as e:
  295. logger.info('async_deactivate_expire_package套餐过期修改失效异常,{}'
  296. 'errLine:{}, errMsg:{}'.format(item['iccid'], e.__traceback__.tb_lineno, repr(e)))
  297. continue
  298. # set无序不重复元素集
  299. iccid_list = list(set(iccid_list))
  300. unicom_api = UnicomObjeect()
  301. for item in iccid_list:
  302. try:
  303. activate_combo_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=1, expire_time__gt=now_time,
  304. is_del=False).values()
  305. if activate_combo_qs.exists():
  306. continue
  307. usage_flow = unicom_api.get_flow_usage_total(item)
  308. # 查询是否有未使用套餐,有则进行激活 否 则调用API停卡
  309. result = UnicomComboTaskView().query_unused_combo_and_activate(item, year, month, usage_flow)
  310. if not result: # 没有可用套餐进行停卡
  311. # 停用设备
  312. unicom_api.change_device_to_disable(iccid=item, reason='没有可用套餐')
  313. logger.info('调用停卡API successful,iccid:{}'.format(item))
  314. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=2) \
  315. .values('id').order_by('-updated_time')
  316. combo_order = combo_order_info_qs.first()
  317. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  318. args=(item, combo_order['id'], 4))
  319. asy.start()
  320. else:
  321. unicom_api.change_device_to_activate(item)
  322. except Exception as e:
  323. logger.info('async_deactivate_expire_package套餐过期停卡异常,{}'
  324. 'errLine:{}, errMsg:{}'.format(item, e.__traceback__.tb_lineno, repr(e)))
  325. continue
  326. @staticmethod
  327. def async_combo_sys_msg_push(iccid, combo_order_id, push_type):
  328. """
  329. 异步保存消息推送 激活|过期
  330. @param iccid:
  331. @param combo_order_id:
  332. @param push_type:
  333. @return:
  334. """
  335. try:
  336. now_time = int(time.time())
  337. ud_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid).values('serial_no', 'user_id')
  338. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': ud_info_qs.first()['serial_no'],
  339. 'status': 0, 'type': push_type,
  340. 'updated_time': now_time,
  341. 'created_time': now_time, 'user_id': ud_info_qs.first()['user_id']}
  342. UnicomFlowPush.objects.create(**push_data)
  343. except Exception as e:
  344. logger.info('-->出错了~,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  345. @staticmethod
  346. def unicom_flow_used(request_dict, response):
  347. """
  348. 查询设备每张卡流量使用情况
  349. @param request_dict:
  350. @param response:
  351. @return:
  352. """
  353. page_size = int(request_dict.get('pageSize', 1))
  354. device_count = UnicomDeviceInfo.objects.filter(card_type=0).count()
  355. total_pages = device_count // page_size + (device_count % page_size > 0) # 计算总页数
  356. for page_number in range(1, total_pages + 1):
  357. u_device_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('id', 'iccid', 'sim_used_flow').order_by(
  358. '-created_time')[(page_number - 1) * page_size:page_number * page_size]
  359. asy = threading.Thread(target=UnicomComboTaskView.thread_collect_flow_used, args=(u_device_qs,))
  360. asy.start()
  361. return response.json(0)
  362. @staticmethod
  363. def thread_collect_flow_used(u_device_qs):
  364. for item in u_device_qs:
  365. try:
  366. unicom_api = UnicomObjeect()
  367. n_time = int(time.time())
  368. # 队列已使用总流量总量
  369. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  370. UnicomDeviceInfo.objects.filter(id=item['id']).update(updated_time=n_time,
  371. sim_used_flow=flow_total_usage)
  372. except Exception as e:
  373. print(repr(e))
  374. continue
  375. @classmethod
  376. def query_flow_used_history(cls, response):
  377. # 获取符合条件的卡片对象查询集,并按创建时间升序排序
  378. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').order_by('created_time')
  379. if not card_qs.exists():
  380. return response.json(0)
  381. asy = threading.Thread(target=UnicomComboTaskView.async_bulk_create_usage_history, args=(card_qs,))
  382. asy.start()
  383. return response.json(0)
  384. @staticmethod
  385. def async_bulk_create_usage_history(qs):
  386. """
  387. 异步批量创建流量用量历史记录
  388. """
  389. redis_obj = RedisObject()
  390. current_time = int(time.time()) # 获取当前时间戳
  391. for item in qs:
  392. iccid = item['iccid']
  393. key = 'monthly_flow_' + iccid
  394. flow_dict = redis_obj.get_all_hash_data(key) # 获取Redis中指定键的哈希表数据
  395. iot_card_list = [] # 创建一个空的列表,用于批量创建IotCardUsageHistory对象
  396. for k, v in flow_dict.items():
  397. try:
  398. cycle = datetime.datetime.strptime(str(k.decode()), '%Y-%m') # 将字符串日期解析为datetime类型
  399. flow = float(v)
  400. iot_card_list.append(IotCardUsageHistory(
  401. iccid=iccid,
  402. card_type=1,
  403. cycle=int(cycle.strftime('%Y%m')), # 将日期转换为整数形式,如202201
  404. flow_total_usage=flow,
  405. created_time=current_time,
  406. updated_time=current_time
  407. ))
  408. except Exception as e:
  409. print(repr(e))
  410. continue
  411. # 批量创建IotCardUsageHistory对象
  412. if iot_card_list:
  413. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  414. @classmethod
  415. def query_flow_cache(cls, response):
  416. """
  417. 查询流量缓存永久的将设置过期时间为10分钟
  418. """
  419. redis = RedisObject()
  420. try:
  421. res = redis.get_keys('ASJ:UNICOM:FLOW:*')
  422. keys = [key.decode() for key in res]
  423. # 进行进一步的处理或打印
  424. for key in keys:
  425. ttl = redis.get_ttl(key)
  426. if ttl == -1:
  427. logger.info('iccidFlow:{}'.format(key))
  428. redis.CONN.expire(key, 60 * 10)
  429. return response.json(0)
  430. except Exception as e:
  431. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  432. return response.json(177, repr(e))
  433. @staticmethod
  434. def get_device_usage_history(response):
  435. """
  436. 查询联通设备历史用量
  437. """
  438. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid')
  439. if not card_qs.exists():
  440. return response.json(0)
  441. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_usage_history, args=(card_qs,))
  442. asy.start()
  443. return response.json(0)
  444. @staticmethod
  445. def async_update_device_usage_history(qs):
  446. """
  447. 异步更新设备用量历史
  448. @param qs: 联通iccid集合
  449. """
  450. try:
  451. u_service = UnicomObjeect()
  452. iot_card_list = []
  453. now_time = int(time.time())
  454. # 获取当前时间
  455. current_date = datetime.datetime.now()
  456. # 计算上个月的时间
  457. last_month_date = current_date - datetime.timedelta(days=current_date.day)
  458. # 例格式化日期为 "202307"
  459. formatted_date = last_month_date.strftime('%Y%m')
  460. for item in qs:
  461. params = {'iccid': item['iccid']}
  462. result = u_service.query_device_usage_history(**params)
  463. res_dict = u_service.get_text_dict(result)
  464. if res_dict['code'] == 0 and res_dict['data']:
  465. for cycle in res_dict['data']:
  466. if cycle['flowTotalUsage'] <= 0 or cycle['cycle'] != int(formatted_date):
  467. continue
  468. iot_card_list.append(IotCardUsageHistory(
  469. iccid=item['iccid'],
  470. card_type=1,
  471. cycle=cycle['cycle'],
  472. flow_total_usage=cycle['flowTotalUsage'],
  473. created_time=now_time,
  474. updated_time=now_time
  475. ))
  476. if not iot_card_list:
  477. return None
  478. # 使用Django的Paginator进行分页
  479. paginator = Paginator(iot_card_list, 300)
  480. for page_num in range(1, paginator.num_pages + 1):
  481. IotCardUsageHistory.objects.bulk_create(paginator.page(page_num).object_list)
  482. except Exception as e:
  483. logger.info('查询账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  484. return None
  485. @staticmethod
  486. def update_device_cycle_flow(response):
  487. """
  488. 更新设备账期流量接口
  489. """
  490. card_qs = UnicomDeviceInfo.objects.filter(card_type=0, status=2).values('iccid')
  491. if not card_qs.exists():
  492. return response.json(0)
  493. logger.info('总数:{}'.format(card_qs.count()))
  494. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_cycle_flow, args=(card_qs,))
  495. asy.start()
  496. return response.json(0)
  497. @staticmethod
  498. def async_update_device_cycle_flow(qs):
  499. """
  500. 异步更新设备账期流量
  501. """
  502. try:
  503. logger.info('进入异步更新设备卡账期流量~~~~')
  504. unicom_api = UnicomObjeect()
  505. for item in qs:
  506. try:
  507. unicom_api.get_flow_usage_total(item['iccid'])
  508. except Exception as e:
  509. print(repr(e))
  510. continue
  511. except Exception as e:
  512. logger.info('更新账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  513. return None
  514. @staticmethod
  515. def get_access_number_change_task(response):
  516. """
  517. 获取接入号码变更任务
  518. """
  519. task_qs = AccessNumberTaskQueue.objects.exclude(status=1).filter(count__lt=4).order_by('created_time')
  520. if not task_qs.exists():
  521. return response.json(0)
  522. logger.info('接入卡号变更任务总数:{}'.format(task_qs.count()))
  523. asy = threading.Thread(target=UnicomComboTaskView.async_update_access_number_status, args=(task_qs,))
  524. asy.start()
  525. return response.json(0)
  526. @staticmethod
  527. def async_update_access_number_status(qs):
  528. """
  529. 异步更新设备账期流量
  530. """
  531. try:
  532. logger.info('进入异步更新接入卡号网络状态~~~~')
  533. redis = RedisObject()
  534. iccid_list = []
  535. for item in qs:
  536. try:
  537. key = f'ASJ:TELECOM:CHANGE:{item.iccid}'
  538. change_data = redis.get_data(key)
  539. if change_data:
  540. iccid_list.append(item.iccid)
  541. continue
  542. if item.iccid in iccid_list: # 避免同一ICCID多次执行
  543. continue
  544. action_value = 'DEL' if item.action == 2 else 'ADD'
  545. result = TelecomObject().single_cut_net(item.access_number, action_value) # 变更网络状态
  546. now_time = int(time.time())
  547. count = item.count + 1
  548. cache_data = {'iccid': item.iccid, 'actionValue': action_value}
  549. if not result: # 失败修改执行次数
  550. iccid_list.append(item.iccid)
  551. data = {'status': 2, 'count': count, 'updated_time': now_time}
  552. AccessNumberTaskQueue.objects.filter(id=item.id).update(**data)
  553. redis.CONN.setnx(key, json.dumps(cache_data))
  554. redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态
  555. continue
  556. data = {'status': 1, 'count': count, 'completion_time': now_time, 'updated_time': now_time}
  557. if result == '-5': # 已符合变更后状态
  558. data['result'] = {'-5': '该号码未订购单独断网功能!;流水号:1000000190202402201640359500'}
  559. else:
  560. data['result'] = result
  561. iccid_list.append(item.iccid)
  562. AccessNumberTaskQueue.objects.filter(id=item.id).update(**data) # 成功后修改任务记录状态
  563. redis.CONN.setnx(key, json.dumps(cache_data))
  564. redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态
  565. except Exception as e:
  566. print(repr(e))
  567. continue
  568. except Exception as e:
  569. logger.info('异步变更卡网络状态异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))