UnicomComboTaskController.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : UnicomComboTaskController.py
  4. @Time : 2022/6/30 16:23
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import datetime
  10. import json
  11. import logging
  12. import threading
  13. import time
  14. from decimal import Decimal
  15. from django.core.paginator import Paginator
  16. from django.db import transaction
  17. from django.db.models import Q
  18. from django.views import View
  19. from Model.models import UnicomComboOrderInfo, UnicomCombo, Order_Model, UnicomDeviceInfo, UnicomFlowPush, \
  20. IotCardUsageHistory, AccessNumberTaskQueue, IotCardOrderUsageHistory, OperatingCosts
  21. from Object.RedisObject import RedisObject
  22. from Object.ResponseObject import ResponseObject
  23. from Object.TelecomObject import TelecomObject
  24. from Object.UnicomObject import UnicomObjeect
  25. from Object.utils import LocalDateTimeUtil
  26. from Service.TelecomService import TelecomService
  27. logger = logging.getLogger('info')
  28. class UnicomComboTaskView(View):
  29. def get(self, request, *args, **kwargs):
  30. request.encoding = 'utf-8'
  31. operation = kwargs.get('operation')
  32. return self.validation(request.GET, request, operation)
  33. def post(self, request, *args, **kwargs):
  34. request.encoding = 'utf-8'
  35. operation = kwargs.get('operation')
  36. return self.validation(request.POST, request, operation)
  37. def validation(self, request_dict, request, operation):
  38. response = ResponseObject()
  39. print(request)
  40. if operation == 'check-activate':
  41. return self.check_activate_combo(request_dict, response)
  42. elif operation == 'check-flow':
  43. return self.check_flow_usage(response)
  44. elif operation == 'check-flow-expire':
  45. return self.check_flow_expire(response)
  46. elif operation == 'check-expire':
  47. today = datetime.datetime.today()
  48. year = today.year
  49. month = today.month
  50. self.query_unused_combo_and_activate(request_dict.get('iccid'), year, month, '666')
  51. return response.json(0)
  52. elif operation == 'updateFlowUsed': # 更新流量使用
  53. self.unicom_flow_used(request_dict, response)
  54. return response.json(0)
  55. elif operation == 'queryFlowUsedHistory':
  56. return self.query_flow_used_history(response)
  57. elif operation == 'queryFlowCache':
  58. return self.query_flow_cache(response)
  59. elif operation == 'getDeviceUsageHistory':
  60. return self.get_device_usage_history(response)
  61. elif operation == 'updateCardCycleFlow':
  62. return self.update_device_cycle_flow(response)
  63. elif operation == 'executeAccessTask':
  64. return self.get_access_number_change_task(response)
  65. elif operation == 'queryTotalTrafficToday':
  66. return self.query_total_traffic_today(response)
  67. elif operation == 'createMonthlyCost':
  68. return self.query_4G_order_info(response)
  69. else:
  70. return response.json(414)
  71. @classmethod
  72. def check_activate_combo(cls, request_dict, response):
  73. """
  74. 定时检查是否有次月激活套餐
  75. @param request_dict:
  76. @param response:
  77. @return:
  78. """
  79. print(request_dict)
  80. logger.info('--->进入监控次月激活联通套餐')
  81. now_time = int(time.time())
  82. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(status=0, next_month_activate=True,
  83. activation_time__lte=now_time,
  84. expire_time__gte=now_time, is_del=0).values()
  85. if not combo_order_info_qs.exists():
  86. return response.json(0)
  87. try:
  88. today = datetime.datetime.today()
  89. year = today.year
  90. month = today.month
  91. with transaction.atomic():
  92. unicom_api = UnicomObjeect()
  93. for item in combo_order_info_qs:
  94. if item['order_id']:
  95. order_id = item['order_id']
  96. order_qs = Order_Model.objects.filter(orderID=order_id, status=1)
  97. if not order_qs.exists():
  98. continue
  99. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, iccid=item['iccid'])
  100. # 当前已有套餐正在使用则跳出当前循环
  101. if combo_order_qs.exists():
  102. continue
  103. combo_id = item['combo_id']
  104. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  105. if not combo_qs.exists():
  106. continue
  107. # 查询当月用量情况
  108. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  109. flow_total_usage = Decimal(flow_total_usage).quantize(
  110. Decimal('0.00')) if flow_total_usage > 0 else 0
  111. flow_total_usage = str(flow_total_usage)
  112. iccid = item['iccid']
  113. # 检查激活iccid
  114. unicom_api.change_device_to_activate(iccid)
  115. cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage)
  116. logger.info('激活成功,订单编号:{}'.format(order_id))
  117. return response.json(0)
  118. except Exception as e:
  119. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  120. return response.json(177, repr(e))
  121. @classmethod
  122. def check_flow_usage(cls, response):
  123. """
  124. 检查流量使用情况
  125. @return:
  126. """
  127. logger.info('--->进入监控流量使用情况')
  128. try:
  129. combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, is_del=False, combo__is_unlimited=0).values()
  130. if not combo_order_qs.exists():
  131. return response.json(0)
  132. asy = threading.Thread(target=UnicomComboTaskView.async_monitoring_flow, args=(combo_order_qs,))
  133. asy.start()
  134. return response.json(0)
  135. except Exception as e:
  136. logger.info('出错了~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  137. return response.json(177, repr(e))
  138. @classmethod
  139. def async_monitoring_flow(cls, combo_order_qs):
  140. """
  141. 异步检测流量使用详情
  142. """
  143. try:
  144. unicom_api = UnicomObjeect()
  145. today = datetime.datetime.today()
  146. year = today.year
  147. month = today.month
  148. now_time = int(time.time())
  149. for item in combo_order_qs:
  150. iccid = item['iccid']
  151. u_device_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid)
  152. if not u_device_info_qs.exists():
  153. continue
  154. u_device_info_qs = u_device_info_qs.first()
  155. card_type = u_device_info_qs.card_type
  156. activate_usage_flow = float(item['flow_total_usage']) if item['flow_total_usage'] else 0.0
  157. combo_id = item['combo_id']
  158. combo_qs = UnicomCombo.objects.filter(id=combo_id).values()
  159. if not combo_qs.exists():
  160. continue
  161. combo_qs = combo_qs.first()
  162. flow_total = combo_qs['flow_total']
  163. # 队列已使用总流量总量
  164. flow_total_usage = unicom_api.get_flow_usage_total(iccid, card_type)
  165. flow_total_usage = float(flow_total_usage)
  166. is_expire = False
  167. flow = activate_usage_flow + flow_total # 激活套餐时ICCID历史用量+当前套餐流量
  168. if flow_total_usage > 0:
  169. # 初始套餐已使用流量 + 套餐总流量
  170. if flow_total_usage >= flow:
  171. is_expire = True
  172. usage = (flow_total_usage - activate_usage_flow) if flow_total_usage > activate_usage_flow else 0
  173. cls.flow_warning_push(u_device_info_qs.user_id, u_device_info_qs.serial_no, item['id'], flow_total,
  174. usage)
  175. # 检查是否有当月未使用套餐 没有则停卡
  176. if is_expire:
  177. flow_exceed = flow_total_usage - flow
  178. UnicomComboOrderInfo.objects.filter(id=item['id']) \
  179. .update(status=2, updated_time=now_time, flow_exceed=flow_exceed)
  180. activate_status = cls.query_unused_combo_and_activate(iccid, year, month,
  181. flow_total_usage)
  182. logger.info('-->当前卡{}流量已用完,是否有生效套餐:{}'.format(iccid, activate_status))
  183. if not activate_status: # 停用或断网
  184. if card_type == 3: # 鼎芯电信
  185. TelecomService().update_access_number_network(iccid, u_device_info_qs.access_number, 'ADD',
  186. '套餐流量已用完')
  187. else:
  188. unicom_api.change_device_to_disable(iccid=iccid, reason='套餐流量已用完')
  189. except Exception as e:
  190. logger.info('异步~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  191. @staticmethod
  192. def flow_warning_push(app_user_id, serial_no, combo_order_id, flow_total, flow_usage):
  193. """
  194. 监控流量使用大于85%and小于96%进行消息推送提醒
  195. @param app_user_id: app用户id
  196. @param serial_no: 序列号
  197. @param combo_order_id: 当前套餐订单id
  198. @param flow_total: 套餐流量总量
  199. @param flow_usage: 套餐已使用流量
  200. @return:
  201. """
  202. try:
  203. if not app_user_id:
  204. return False
  205. now_time = int(time.time())
  206. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': serial_no,
  207. 'flow_total_usage': flow_usage, 'flow_total': flow_total, 'status': 0,
  208. 'updated_time': now_time,
  209. 'created_time': now_time, 'user_id': app_user_id}
  210. if 0 < flow_total and 0 < flow_usage < flow_total:
  211. res = flow_usage / flow_total * 100
  212. if 85 < res <= 95:
  213. flow_push = UnicomFlowPush.objects.filter(serial_no=serial_no, combo_order_id=combo_order_id)
  214. if not flow_push.exists():
  215. UnicomFlowPush.objects.create(**push_data)
  216. elif flow_usage >= flow_total:
  217. push_data['flow_total_usage'] = flow_total
  218. push_data['type'] = 1
  219. UnicomFlowPush.objects.create(**push_data)
  220. return True
  221. except Exception as e:
  222. logger.info('出错了~异常流量监控,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  223. @staticmethod
  224. def query_unused_combo_and_activate(iccid, year, month, usage_flow):
  225. """
  226. 查询未使用套餐并激活
  227. @param iccid:
  228. @param year:
  229. @param month:
  230. @param usage_flow:
  231. @return:
  232. """
  233. try:
  234. now_time = int(time.time())
  235. combo_order_qs = UnicomComboOrderInfo.objects \
  236. .filter(expire_time__gt=now_time, activation_time__lte=now_time, status=0, iccid=iccid) \
  237. .order_by('created_time')
  238. if not combo_order_qs.exists():
  239. return False
  240. combo_order = combo_order_qs.first()
  241. if not combo_order.order_id:
  242. return False
  243. order_qs = Order_Model.objects.filter(orderID=combo_order.order_id, status=1)
  244. if not order_qs.exists():
  245. return False
  246. upd_data = {
  247. 'status': 1,
  248. 'year': year,
  249. 'month': month,
  250. 'flow_total_usage': str(usage_flow),
  251. 'activation_time': now_time,
  252. 'updated_time': now_time,
  253. }
  254. UnicomComboOrderInfo.objects.filter(id=combo_order.id).update(**upd_data)
  255. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  256. args=(iccid, combo_order.id, 3))
  257. asy.start()
  258. return True
  259. except Exception as e:
  260. logger.info('出错了~激活套餐,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  261. return False
  262. @classmethod
  263. def check_flow_expire(cls, response):
  264. """
  265. 检查流量到期停卡操作
  266. @param response:
  267. @return:
  268. """
  269. logger.info('check_flow_expire进入监控流量到期停卡或激活叠加包')
  270. now_time = int(time.time())
  271. combo_order_qs = UnicomComboOrderInfo.objects.filter(~Q(status=2), expire_time__lte=now_time,
  272. is_del=False).values()
  273. if not combo_order_qs.exists():
  274. return response.json(0)
  275. asy = threading.Thread(target=UnicomComboTaskView.async_deactivate_expire_package,
  276. args=(combo_order_qs, now_time))
  277. asy.start()
  278. return response.json(0)
  279. @staticmethod
  280. def async_deactivate_expire_package(combo_order_qs, now_time):
  281. """
  282. 异步停用过期套餐
  283. @param combo_order_qs: 过期套餐querySet
  284. @param now_time 当前实际
  285. """
  286. today = datetime.datetime.today()
  287. year = today.year
  288. month = today.month
  289. iccid_list = []
  290. for item in combo_order_qs:
  291. try:
  292. icc_id = item['iccid']
  293. um_device_qs = UnicomDeviceInfo.objects.filter(iccid=icc_id)
  294. if not um_device_qs.exists():
  295. continue
  296. UnicomComboOrderInfo.objects.filter(id=item['id']).update(status=2, updated_time=now_time)
  297. iccid_list.append(icc_id)
  298. logger.info('修改套餐已失效成功,iccid:{}'.format(icc_id))
  299. except Exception as e:
  300. logger.info('async_deactivate_expire_package套餐过期修改失效异常,{}'
  301. 'errLine:{}, errMsg:{}'.format(item['iccid'], e.__traceback__.tb_lineno, repr(e)))
  302. continue
  303. # set无序不重复元素集
  304. iccid_list = list(set(iccid_list))
  305. unicom_api = UnicomObjeect()
  306. for item in iccid_list:
  307. try:
  308. activate_combo_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=1, expire_time__gt=now_time,
  309. is_del=False).values()
  310. if activate_combo_qs.exists():
  311. continue
  312. usage_flow = unicom_api.get_flow_usage_total(item)
  313. # 查询是否有未使用套餐,有则进行激活 否 则调用API停卡
  314. result = UnicomComboTaskView().query_unused_combo_and_activate(item, year, month, usage_flow)
  315. if not result: # 没有可用套餐进行停卡
  316. # 停用设备
  317. unicom_api.change_device_to_disable(iccid=item, reason='没有可用套餐')
  318. logger.info('调用停卡API successful,iccid:{}'.format(item))
  319. combo_order_info_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=2) \
  320. .values('id').order_by('-updated_time')
  321. combo_order = combo_order_info_qs.first()
  322. asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push,
  323. args=(item, combo_order['id'], 4))
  324. asy.start()
  325. else:
  326. unicom_api.change_device_to_activate(item)
  327. except Exception as e:
  328. logger.info('async_deactivate_expire_package套餐过期停卡异常,{}'
  329. 'errLine:{}, errMsg:{}'.format(item, e.__traceback__.tb_lineno, repr(e)))
  330. continue
  331. @staticmethod
  332. def async_combo_sys_msg_push(iccid, combo_order_id, push_type):
  333. """
  334. 异步保存消息推送 激活|过期
  335. @param iccid:
  336. @param combo_order_id:
  337. @param push_type:
  338. @return:
  339. """
  340. try:
  341. now_time = int(time.time())
  342. ud_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid).values('serial_no', 'user_id')
  343. push_data = {'combo_order_id': str(combo_order_id), 'serial_no': ud_info_qs.first()['serial_no'],
  344. 'status': 0, 'type': push_type,
  345. 'updated_time': now_time,
  346. 'created_time': now_time, 'user_id': ud_info_qs.first()['user_id']}
  347. UnicomFlowPush.objects.create(**push_data)
  348. except Exception as e:
  349. logger.info('-->出错了~,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  350. @staticmethod
  351. def unicom_flow_used(request_dict, response):
  352. """
  353. 查询设备每张卡流量使用情况
  354. @param request_dict:
  355. @param response:
  356. @return:
  357. """
  358. page_size = int(request_dict.get('pageSize', 1))
  359. device_count = UnicomDeviceInfo.objects.filter(card_type=0).count()
  360. total_pages = device_count // page_size + (device_count % page_size > 0) # 计算总页数
  361. for page_number in range(1, total_pages + 1):
  362. u_device_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('id', 'iccid', 'sim_used_flow').order_by(
  363. '-created_time')[(page_number - 1) * page_size:page_number * page_size]
  364. asy = threading.Thread(target=UnicomComboTaskView.thread_collect_flow_used, args=(u_device_qs,))
  365. asy.start()
  366. return response.json(0)
  367. @staticmethod
  368. def thread_collect_flow_used(u_device_qs):
  369. for item in u_device_qs:
  370. try:
  371. unicom_api = UnicomObjeect()
  372. n_time = int(time.time())
  373. # 队列已使用总流量总量
  374. flow_total_usage = unicom_api.get_flow_usage_total(item['iccid'])
  375. UnicomDeviceInfo.objects.filter(id=item['id']).update(updated_time=n_time,
  376. sim_used_flow=flow_total_usage)
  377. except Exception as e:
  378. print(repr(e))
  379. continue
  380. @classmethod
  381. def query_flow_used_history(cls, response):
  382. # 获取符合条件的卡片对象查询集,并按创建时间升序排序
  383. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').order_by('created_time')
  384. if not card_qs.exists():
  385. return response.json(0)
  386. asy = threading.Thread(target=UnicomComboTaskView.async_bulk_create_usage_history, args=(card_qs,))
  387. asy.start()
  388. return response.json(0)
  389. @staticmethod
  390. def async_bulk_create_usage_history(qs):
  391. """
  392. 异步批量创建流量用量历史记录
  393. """
  394. redis_obj = RedisObject()
  395. current_time = int(time.time()) # 获取当前时间戳
  396. for item in qs:
  397. iccid = item['iccid']
  398. key = 'monthly_flow_' + iccid
  399. flow_dict = redis_obj.get_all_hash_data(key) # 获取Redis中指定键的哈希表数据
  400. iot_card_list = [] # 创建一个空的列表,用于批量创建IotCardUsageHistory对象
  401. for k, v in flow_dict.items():
  402. try:
  403. cycle = datetime.datetime.strptime(str(k.decode()), '%Y-%m') # 将字符串日期解析为datetime类型
  404. flow = float(v)
  405. iot_card_list.append(IotCardUsageHistory(
  406. iccid=iccid,
  407. card_type=1,
  408. cycle=int(cycle.strftime('%Y%m')), # 将日期转换为整数形式,如202201
  409. flow_total_usage=flow,
  410. created_time=current_time,
  411. updated_time=current_time
  412. ))
  413. except Exception as e:
  414. print(repr(e))
  415. continue
  416. # 批量创建IotCardUsageHistory对象
  417. if iot_card_list:
  418. IotCardUsageHistory.objects.bulk_create(iot_card_list)
  419. @classmethod
  420. def query_flow_cache(cls, response):
  421. """
  422. 查询流量缓存永久的将设置过期时间为10分钟
  423. """
  424. redis = RedisObject()
  425. try:
  426. res = redis.get_keys('ASJ:UNICOM:FLOW:*')
  427. keys = [key.decode() for key in res]
  428. # 进行进一步的处理或打印
  429. for key in keys:
  430. ttl = redis.get_ttl(key)
  431. if ttl == -1:
  432. logger.info('iccidFlow:{}'.format(key))
  433. redis.CONN.expire(key, 60 * 10)
  434. return response.json(0)
  435. except Exception as e:
  436. logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  437. return response.json(177, repr(e))
  438. @staticmethod
  439. def get_device_usage_history(response):
  440. """
  441. 查询联通设备历史用量
  442. """
  443. card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid')
  444. if not card_qs.exists():
  445. return response.json(0)
  446. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_usage_history, args=(card_qs,))
  447. asy.start()
  448. return response.json(0)
  449. @staticmethod
  450. def async_update_device_usage_history(qs):
  451. """
  452. 异步更新设备用量历史
  453. @param qs: 联通iccid集合
  454. """
  455. try:
  456. u_service = UnicomObjeect()
  457. iot_card_list = []
  458. now_time = int(time.time())
  459. # 获取当前时间
  460. current_date = datetime.datetime.now()
  461. # 计算上个月的时间
  462. last_month_date = current_date - datetime.timedelta(days=current_date.day)
  463. # 例格式化日期为 "202307"
  464. formatted_date = last_month_date.strftime('%Y%m')
  465. for item in qs:
  466. params = {'iccid': item['iccid']}
  467. result = u_service.query_device_usage_history(**params)
  468. res_dict = u_service.get_text_dict(result)
  469. if res_dict['code'] == 0 and res_dict['data']:
  470. for cycle in res_dict['data']:
  471. if cycle['flowTotalUsage'] <= 0 or cycle['cycle'] != int(formatted_date):
  472. continue
  473. iot_card_list.append(IotCardUsageHistory(
  474. iccid=item['iccid'],
  475. card_type=1,
  476. cycle=cycle['cycle'],
  477. flow_total_usage=cycle['flowTotalUsage'],
  478. created_time=now_time,
  479. updated_time=now_time
  480. ))
  481. if not iot_card_list:
  482. return None
  483. # 使用Django的Paginator进行分页
  484. paginator = Paginator(iot_card_list, 300)
  485. for page_num in range(1, paginator.num_pages + 1):
  486. IotCardUsageHistory.objects.bulk_create(paginator.page(page_num).object_list)
  487. except Exception as e:
  488. logger.info('查询账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  489. return None
  490. @staticmethod
  491. def update_device_cycle_flow(response):
  492. """
  493. 更新设备账期流量接口
  494. """
  495. card_qs = UnicomDeviceInfo.objects.filter(card_type=0, status=2).values('iccid')
  496. if not card_qs.exists():
  497. return response.json(0)
  498. logger.info('总数:{}'.format(card_qs.count()))
  499. asy = threading.Thread(target=UnicomComboTaskView.async_update_device_cycle_flow, args=(card_qs,))
  500. asy.start()
  501. return response.json(0)
  502. @staticmethod
  503. def async_update_device_cycle_flow(qs):
  504. """
  505. 异步更新设备账期流量
  506. """
  507. try:
  508. logger.info('进入异步更新设备卡账期流量~~~~')
  509. unicom_api = UnicomObjeect()
  510. for item in qs:
  511. try:
  512. unicom_api.get_flow_usage_total(item['iccid'])
  513. except Exception as e:
  514. print(repr(e))
  515. continue
  516. except Exception as e:
  517. logger.info('更新账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  518. return None
  519. @staticmethod
  520. def get_access_number_change_task(response):
  521. """
  522. 获取接入号码变更任务
  523. """
  524. task_qs = AccessNumberTaskQueue.objects.exclude(status=1).filter(count__lt=4).order_by('created_time')
  525. if not task_qs.exists():
  526. return response.json(0)
  527. logger.info('接入卡号变更任务总数:{}'.format(task_qs.count()))
  528. asy = threading.Thread(target=UnicomComboTaskView.async_update_access_number_status, args=(task_qs,))
  529. asy.start()
  530. return response.json(0)
  531. @staticmethod
  532. def async_update_access_number_status(qs):
  533. """
  534. 异步更新设备账期流量
  535. """
  536. try:
  537. logger.info('进入异步更新接入卡号网络状态~~~~')
  538. redis = RedisObject()
  539. iccid_list = []
  540. for item in qs:
  541. try:
  542. key = f'ASJ:TELECOM:CHANGE:{item.iccid}'
  543. change_data = redis.get_data(key)
  544. if change_data:
  545. iccid_list.append(item.iccid)
  546. continue
  547. if item.iccid in iccid_list: # 避免同一ICCID多次执行
  548. continue
  549. action_value = 'DEL' if item.action == 2 else 'ADD'
  550. result = TelecomObject().single_cut_net(item.access_number, action_value) # 变更网络状态
  551. now_time = int(time.time())
  552. count = item.count + 1
  553. cache_data = {'iccid': item.iccid, 'actionValue': action_value}
  554. if not result: # 失败修改执行次数
  555. iccid_list.append(item.iccid)
  556. data = {'status': 2, 'count': count, 'updated_time': now_time}
  557. AccessNumberTaskQueue.objects.filter(id=item.id).update(**data)
  558. redis.CONN.setnx(key, json.dumps(cache_data))
  559. redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态
  560. continue
  561. data = {'status': 1, 'count': count, 'completion_time': now_time, 'updated_time': now_time}
  562. if result == '-5': # 已符合变更后状态
  563. data['result'] = {'-5': '该号码未订购单独断网功能!;流水号:1000000190202402201640359500'}
  564. else:
  565. data['result'] = result
  566. iccid_list.append(item.iccid)
  567. AccessNumberTaskQueue.objects.filter(id=item.id).update(**data) # 成功后修改任务记录状态
  568. redis.CONN.setnx(key, json.dumps(cache_data))
  569. redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态
  570. except Exception as e:
  571. print(repr(e))
  572. continue
  573. except Exception as e:
  574. logger.info('异步变更卡网络状态异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  575. @staticmethod
  576. def query_total_traffic_today(response):
  577. """
  578. 4G订单查询今日总流量消耗
  579. @return:
  580. """
  581. asy = threading.Thread(target=UnicomComboTaskView.async_save_today_use_float)
  582. asy.start()
  583. logger.info('查询4G订单总消耗流量记录')
  584. return response.json(0)
  585. @staticmethod
  586. def async_save_today_use_float():
  587. # 查询正在使用套餐并且大于等于2024年激活 不等于免费测试流量的订单数据
  588. today = datetime.datetime.today()
  589. n_data = today.strftime('%Y%m%d')
  590. start_time, end_time = LocalDateTimeUtil.get_start_and_end_time(today.strftime('%Y-%m-%d'), '%Y-%m-%d')
  591. o_combo_qs = UnicomComboOrderInfo.objects \
  592. .filter(status=1, year__gte=2024, created_time__gte=1714492800, created_time__lt=start_time) \
  593. .exclude(combo__combo_type=4) \
  594. .values('order_id', 'iccid', 'flow_total_usage')
  595. try:
  596. unicom_api = UnicomObjeect()
  597. if not o_combo_qs.exists():
  598. return
  599. n_time = int(time.time())
  600. day = n_data[-2:]
  601. iccid_list = []
  602. for item in o_combo_qs:
  603. iccid = item['iccid']
  604. try:
  605. flow = float(unicom_api.get_flow_usage_total(iccid))
  606. if flow == 0:
  607. continue
  608. old_flow = float(item['flow_total_usage'])
  609. if flow == old_flow:
  610. continue
  611. total_usage = Decimal(flow - old_flow).quantize(Decimal('0.00'))
  612. iccid_list.append(IotCardOrderUsageHistory(
  613. iccid=iccid,
  614. order_id=item['order_id'],
  615. card_type=1,
  616. cycle=int(n_data), # 将日期转换为整数形式,如202201
  617. cycle_date=int(day),
  618. total_traffic=total_usage,
  619. created_time=n_time,
  620. updated_time=n_time
  621. ))
  622. if len(iccid_list) >= 300:
  623. IotCardOrderUsageHistory.objects.bulk_create(iccid_list)
  624. iccid_list = []
  625. except Exception as e:
  626. logger.error(
  627. '查询日用量异常iccid{},errLine:{}, errMsg:{}'.format(iccid, e.__traceback__.tb_lineno, repr(e)))
  628. if iccid_list:
  629. IotCardOrderUsageHistory.objects.bulk_create(iccid_list)
  630. except Exception as e:
  631. logger.error('统计4G卡日用量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  632. @classmethod
  633. def query_4G_order_info(cls, response):
  634. # 获取当前月的开始时间戳
  635. first_timestamp, last_timestamp = cls.get_current_month_range()
  636. # 统计条件失效时间大于当月开始时间(6月成本账单)统计范围订单创建时间大于5月1号的套餐
  637. # 查询APP上显示的套餐
  638. u_order_qs = UnicomComboOrderInfo.objects.filter(expire_time__gte=first_timestamp, combo__is_show=True
  639. , created_time__gte=1714492800) \
  640. .exclude(combo_id__in=[24, 26, 34]).values('status', 'combo__expiration_days', 'flow_total_usage',
  641. 'combo__flow_total', 'order_id',
  642. 'activation_time', 'expire_time', 'created_time', 'iccid'
  643. ).order_by('created_time')
  644. asy = threading.Thread(target=UnicomComboTaskView.created_flow_monthly_cost,
  645. args=(u_order_qs, first_timestamp, last_timestamp))
  646. asy.start()
  647. print(datetime.datetime.today())
  648. print(u_order_qs.count())
  649. # 查询APP不显示购买的套餐 执行异步
  650. u_order_qs = UnicomComboOrderInfo.objects.filter(expire_time__gte=first_timestamp, combo__is_show=False
  651. , created_time__gte=1714492800) \
  652. .exclude(combo_id__in=[24, 26, 34]).values('status', 'combo__expiration_days', 'flow_total_usage',
  653. 'combo__flow_total', 'order_id',
  654. 'activation_time', 'expire_time', 'created_time', 'iccid'
  655. ).order_by('created_time')
  656. asy = threading.Thread(target=UnicomComboTaskView.created_flow_monthly_cost,
  657. args=(u_order_qs, first_timestamp, last_timestamp))
  658. asy.start()
  659. print(u_order_qs.count())
  660. return response.json(0)
  661. @classmethod
  662. def created_flow_monthly_cost(cls, u_order_qs, first_timestamp, last_timestamp):
  663. print(f'开始了{datetime.datetime.now()}')
  664. first_date = int(datetime.datetime.fromtimestamp(first_timestamp).strftime('%Y%m%d'))
  665. costs_list = []
  666. for item in u_order_qs:
  667. try:
  668. # 查询订单是否存在
  669. order_qs = Order_Model.objects.filter(orderID=item['order_id'], status=1).values('orderID', 'UID',
  670. 'payTime', 'price',
  671. 'store_meal_name')
  672. if not order_qs.exists():
  673. continue
  674. u_order = item
  675. price = float(order_qs[0]['price'])
  676. day_average_price = 0 # 收入分摊/天 订单金额/套餐总天数
  677. if price > day_average_price:
  678. expiration_days = int(u_order['combo__expiration_days'])
  679. day_average_price = price / expiration_days
  680. # 套餐到期时间与当月结算时间相差天数=剩余使用天数
  681. expire_time = datetime.datetime.fromtimestamp(u_order['expire_time'])
  682. month_time = datetime.datetime.fromtimestamp(last_timestamp)
  683. # 剩余使用天数
  684. days_diff = (expire_time - month_time).days
  685. # 当月结算时间与套餐创建时间相差天数=当月结算天数
  686. if u_order['created_time'] >= first_timestamp:
  687. c_time = datetime.datetime.fromtimestamp(u_order['created_time'])
  688. else:
  689. c_time = datetime.datetime.fromtimestamp(first_timestamp)
  690. settlement_days = (month_time - c_time).days + 1
  691. settlement_days = 0 if settlement_days < 0 else settlement_days
  692. # 组装当月成本账单结构
  693. costs_vo = {
  694. 'settlement_days': settlement_days, # 当月结算天数
  695. 'order_id': order_qs[0]['orderID'],
  696. 'uid': order_qs[0]['UID'],
  697. 'day_average_price': Decimal(day_average_price).quantize(Decimal('0.0000')), # 收入分摊/天
  698. 'purchase_quantity': u_order['combo__flow_total'],
  699. 'start_time': u_order['created_time'],
  700. 'end_time': u_order['expire_time'],
  701. 'created_time': last_timestamp,
  702. 'remaining_usage_time': days_diff, # 剩余使用时间
  703. 'time': first_timestamp,
  704. 'country_name': '中国',
  705. 'order_type': '4G',
  706. 'region': '国内',
  707. 'price': order_qs[0]['price']
  708. }
  709. days = int(u_order['combo__expiration_days'])
  710. if settlement_days > 0 and price > 0:
  711. # 实际收入/套餐总天数*结算天数
  712. costs_vo['monthly_income'] = Decimal((price - price * 0.0054) / days * settlement_days).quantize(
  713. Decimal('0.0000'))
  714. month_average_price = day_average_price * settlement_days if day_average_price > 0 else 0
  715. # 收入分摊/天*当月使用天数
  716. costs_vo['month_average_price'] = Decimal(month_average_price).quantize(Decimal('0.00'))
  717. # 使用filter()方法进行查询
  718. uos_qs = UnicomComboOrderInfo.objects.filter(
  719. iccid=u_order['iccid'], expire_time__gte=first_timestamp
  720. ).exclude(combo_id__in=[24, 26, 34])
  721. con = uos_qs.count()
  722. profit_margin = 0
  723. cost = 2
  724. profit = 0 # 当月利润
  725. # 查询IOT卡订单用量历史
  726. n_flow = IotCardOrderUsageHistory.objects.filter(
  727. order_id=u_order['order_id'],
  728. cycle__gte=first_date
  729. ).order_by('-cycle_date')
  730. cost = cost if not uos_qs.exists() else cost / con # 联通4G单卡月租2元/对应套餐数量 当月成本分摊
  731. if n_flow.exists():
  732. # 查询IOT卡订单用量历史
  733. old_flow = IotCardOrderUsageHistory.objects.filter(
  734. order_id=u_order['order_id'],
  735. cycle__lt=first_date
  736. ).order_by('-cycle_date')
  737. old_use_flow = float(old_flow.first().total_traffic if old_flow.exists() else 0)
  738. n_user_flow = float(n_flow.first().total_traffic)
  739. actual_flow = n_user_flow - old_use_flow if n_user_flow > 0 else 0
  740. costs_vo['actual_flow'] = actual_flow
  741. # 基础分摊成本
  742. flow_cost = 0
  743. # 流量使用成本
  744. if actual_flow > 0:
  745. flow_cost = (actual_flow / 1024 * 0.4)
  746. cost = cost + flow_cost
  747. if 'monthly_income' in costs_vo:
  748. profit = float(costs_vo['monthly_income']) - cost
  749. profit_margin = profit / cost * (1 * 100)
  750. costs_vo['actual_flow'] = Decimal(actual_flow).quantize(Decimal('0.00'))
  751. elif price > 0:
  752. if 'monthly_income' in costs_vo:
  753. profit = float(costs_vo['monthly_income']) - cost
  754. profit_margin = profit / cost * (1 * 100)
  755. costs_vo['flow_cost'] = Decimal(cost).quantize(Decimal('0.00'))
  756. costs_vo['profit'] = Decimal(profit).quantize(Decimal('0.00'))
  757. costs_vo['profit_margin'] = Decimal(profit_margin).quantize(Decimal('0.00'))
  758. fee = price * 0.0054 if price > 0 else 0
  759. costs_vo['fee'] = Decimal(fee).quantize(Decimal('0.00'))
  760. costs_vo['real_income'] = Decimal(price - fee).quantize(Decimal('0.00'))
  761. costs_vo['expire'] = order_qs[0]['store_meal_name']
  762. costs_list.append(OperatingCosts(**costs_vo))
  763. # # 批量插入操作,每300条记录插入一次
  764. if len(costs_list) >= 300:
  765. OperatingCosts.objects.bulk_create(costs_list)
  766. costs_list = []
  767. except Exception as e:
  768. logger.error('统计4G卡日用量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  769. continue
  770. # 插入剩余的记录
  771. if costs_list:
  772. OperatingCosts.objects.bulk_create(costs_list)
  773. return True
  774. @staticmethod
  775. def get_current_month_range():
  776. # 获取当前日期
  777. today = datetime.datetime.today()
  778. # 获取当前月份第一天
  779. first_day = today.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
  780. # 获取下个月第一天
  781. next_month = today.replace(day=28) + datetime.timedelta(days=4) # 跳过了2月份的28号日期
  782. last_day = next_month - datetime.timedelta(days=next_month.day)
  783. # 转换为时间戳
  784. first_timestamp = int(first_day.timestamp())
  785. last_timestamp = int(last_day.timestamp())
  786. return first_timestamp, last_timestamp