# -*- encoding: utf-8 -*- """ @File : UnicomComboTaskController.py @Time : 2022/6/30 16:23 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import datetime import json import logging import threading import time from decimal import Decimal from django.core.paginator import Paginator from django.db import transaction from django.db.models import Q from django.views import View from Model.models import UnicomComboOrderInfo, UnicomCombo, Order_Model, UnicomDeviceInfo, UnicomFlowPush, \ IotCardUsageHistory, AccessNumberTaskQueue, IotCardOrderUsageHistory from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TelecomObject import TelecomObject from Object.UnicomObject import UnicomObjeect from Object.utils import LocalDateTimeUtil from Service.TelecomService import TelecomService logger = logging.getLogger('info') class UnicomComboTaskView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() print(request) if operation == 'check-activate': return self.check_activate_combo(request_dict, response) elif operation == 'check-flow': return self.check_flow_usage(response) elif operation == 'check-flow-expire': return self.check_flow_expire(response) elif operation == 'check-expire': today = datetime.datetime.today() year = today.year month = today.month self.query_unused_combo_and_activate(request_dict.get('iccid'), year, month, '666') return response.json(0) elif operation == 'updateFlowUsed': # 更新流量使用 self.unicom_flow_used(request_dict, response) return response.json(0) elif operation == 'queryFlowUsedHistory': return self.query_flow_used_history(response) elif operation == 'queryFlowCache': return self.query_flow_cache(response) elif operation == 'getDeviceUsageHistory': return self.get_device_usage_history(response) elif operation == 'updateCardCycleFlow': return self.update_device_cycle_flow(response) elif operation == 'executeAccessTask': return self.get_access_number_change_task(response) elif operation == 'queryTotalTrafficToday': return self.query_total_traffic_today(response) else: return response.json(414) @classmethod def check_activate_combo(cls, request_dict, response): """ 定时检查是否有次月激活套餐 @param request_dict: @param response: @return: """ print(request_dict) logger.info('--->进入监控次月激活联通套餐') now_time = int(time.time()) combo_order_info_qs = UnicomComboOrderInfo.objects.filter(status=0, next_month_activate=True, activation_time__lte=now_time, expire_time__gte=now_time, is_del=0).values() if not combo_order_info_qs.exists(): return response.json(0) try: today = datetime.datetime.today() year = today.year month = today.month with transaction.atomic(): unicom_api = UnicomObjeect() for item in combo_order_info_qs: if item['order_id']: order_id = item['order_id'] order_qs = Order_Model.objects.filter(orderID=order_id, status=1) if not order_qs.exists(): continue combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, iccid=item['iccid']) # 当前已有套餐正在使用则跳出当前循环 if combo_order_qs.exists(): continue combo_id = item['combo_id'] combo_qs = UnicomCombo.objects.filter(id=combo_id).values() if not combo_qs.exists(): continue # 查询当月用量情况 flow_total_usage = unicom_api.get_flow_usage_total(item['iccid']) flow_total_usage = Decimal(flow_total_usage).quantize( Decimal('0.00')) if flow_total_usage > 0 else 0 flow_total_usage = str(flow_total_usage) iccid = item['iccid'] # 检查激活iccid unicom_api.change_device_to_activate(iccid) cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage) logger.info('激活成功,订单编号:{}'.format(order_id)) return response.json(0) except Exception as e: logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(177, repr(e)) @classmethod def check_flow_usage(cls, response): """ 检查流量使用情况 @return: """ logger.info('--->进入监控流量使用情况') try: combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, is_del=False, combo__is_unlimited=0).values() if not combo_order_qs.exists(): return response.json(0) asy = threading.Thread(target=UnicomComboTaskView.async_monitoring_flow, args=(combo_order_qs,)) asy.start() return response.json(0) except Exception as e: logger.info('出错了~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(177, repr(e)) @classmethod def async_monitoring_flow(cls, combo_order_qs): """ 异步检测流量使用详情 """ try: unicom_api = UnicomObjeect() today = datetime.datetime.today() year = today.year month = today.month now_time = int(time.time()) for item in combo_order_qs: iccid = item['iccid'] u_device_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid) if not u_device_info_qs.exists(): continue u_device_info_qs = u_device_info_qs.first() card_type = u_device_info_qs.card_type activate_usage_flow = float(item['flow_total_usage']) if item['flow_total_usage'] else 0.0 combo_id = item['combo_id'] combo_qs = UnicomCombo.objects.filter(id=combo_id).values() if not combo_qs.exists(): continue combo_qs = combo_qs.first() flow_total = combo_qs['flow_total'] # 队列已使用总流量总量 flow_total_usage = unicom_api.get_flow_usage_total(iccid, card_type) flow_total_usage = float(flow_total_usage) is_expire = False flow = activate_usage_flow + flow_total # 激活套餐时ICCID历史用量+当前套餐流量 if flow_total_usage > 0: # 初始套餐已使用流量 + 套餐总流量 if flow_total_usage >= flow: is_expire = True usage = (flow_total_usage - activate_usage_flow) if flow_total_usage > activate_usage_flow else 0 cls.flow_warning_push(u_device_info_qs.user_id, u_device_info_qs.serial_no, item['id'], flow_total, usage) # 检查是否有当月未使用套餐 没有则停卡 if is_expire: flow_exceed = flow_total_usage - flow UnicomComboOrderInfo.objects.filter(id=item['id']) \ .update(status=2, updated_time=now_time, flow_exceed=flow_exceed) activate_status = cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage) logger.info('-->当前卡{}流量已用完,是否有生效套餐:{}'.format(iccid, activate_status)) if not activate_status: # 停用或断网 if card_type == 3: # 鼎芯电信 TelecomService().update_access_number_network(iccid, u_device_info_qs.access_number, 'ADD', '套餐流量已用完') else: unicom_api.change_device_to_disable(iccid=iccid, reason='套餐流量已用完') except Exception as e: logger.info('异步~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def flow_warning_push(app_user_id, serial_no, combo_order_id, flow_total, flow_usage): """ 监控流量使用大于85%and小于96%进行消息推送提醒 @param app_user_id: app用户id @param serial_no: 序列号 @param combo_order_id: 当前套餐订单id @param flow_total: 套餐流量总量 @param flow_usage: 套餐已使用流量 @return: """ try: if not app_user_id: return False now_time = int(time.time()) push_data = {'combo_order_id': str(combo_order_id), 'serial_no': serial_no, 'flow_total_usage': flow_usage, 'flow_total': flow_total, 'status': 0, 'updated_time': now_time, 'created_time': now_time, 'user_id': app_user_id} if 0 < flow_total and 0 < flow_usage < flow_total: res = flow_usage / flow_total * 100 if 85 < res <= 95: flow_push = UnicomFlowPush.objects.filter(serial_no=serial_no, combo_order_id=combo_order_id) if not flow_push.exists(): UnicomFlowPush.objects.create(**push_data) elif flow_usage >= flow_total: push_data['flow_total_usage'] = flow_total push_data['type'] = 1 UnicomFlowPush.objects.create(**push_data) return True except Exception as e: logger.info('出错了~异常流量监控,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def query_unused_combo_and_activate(iccid, year, month, usage_flow): """ 查询未使用套餐并激活 @param iccid: @param year: @param month: @param usage_flow: @return: """ try: now_time = int(time.time()) combo_order_qs = UnicomComboOrderInfo.objects \ .filter(expire_time__gt=now_time, activation_time__lte=now_time, status=0, iccid=iccid) \ .order_by('created_time') if not combo_order_qs.exists(): return False combo_order = combo_order_qs.first() if not combo_order.order_id: return False order_qs = Order_Model.objects.filter(orderID=combo_order.order_id, status=1) if not order_qs.exists(): return False upd_data = { 'status': 1, 'year': year, 'month': month, 'flow_total_usage': str(usage_flow), 'activation_time': now_time, 'updated_time': now_time, } UnicomComboOrderInfo.objects.filter(id=combo_order.id).update(**upd_data) asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push, args=(iccid, combo_order.id, 3)) asy.start() return True except Exception as e: logger.info('出错了~激活套餐,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False @classmethod def check_flow_expire(cls, response): """ 检查流量到期停卡操作 @param response: @return: """ logger.info('check_flow_expire进入监控流量到期停卡或激活叠加包') now_time = int(time.time()) combo_order_qs = UnicomComboOrderInfo.objects.filter(~Q(status=2), expire_time__lte=now_time, is_del=False).values() if not combo_order_qs.exists(): return response.json(0) asy = threading.Thread(target=UnicomComboTaskView.async_deactivate_expire_package, args=(combo_order_qs, now_time)) asy.start() return response.json(0) @staticmethod def async_deactivate_expire_package(combo_order_qs, now_time): """ 异步停用过期套餐 @param combo_order_qs: 过期套餐querySet @param now_time 当前实际 """ today = datetime.datetime.today() year = today.year month = today.month iccid_list = [] for item in combo_order_qs: try: icc_id = item['iccid'] um_device_qs = UnicomDeviceInfo.objects.filter(iccid=icc_id) if not um_device_qs.exists(): continue UnicomComboOrderInfo.objects.filter(id=item['id']).update(status=2, updated_time=now_time) iccid_list.append(icc_id) logger.info('修改套餐已失效成功,iccid:{}'.format(icc_id)) except Exception as e: logger.info('async_deactivate_expire_package套餐过期修改失效异常,{}' 'errLine:{}, errMsg:{}'.format(item['iccid'], e.__traceback__.tb_lineno, repr(e))) continue # set无序不重复元素集 iccid_list = list(set(iccid_list)) unicom_api = UnicomObjeect() for item in iccid_list: try: activate_combo_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=1, expire_time__gt=now_time, is_del=False).values() if activate_combo_qs.exists(): continue usage_flow = unicom_api.get_flow_usage_total(item) # 查询是否有未使用套餐,有则进行激活 否 则调用API停卡 result = UnicomComboTaskView().query_unused_combo_and_activate(item, year, month, usage_flow) if not result: # 没有可用套餐进行停卡 # 停用设备 unicom_api.change_device_to_disable(iccid=item, reason='没有可用套餐') logger.info('调用停卡API successful,iccid:{}'.format(item)) combo_order_info_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=2) \ .values('id').order_by('-updated_time') combo_order = combo_order_info_qs.first() asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push, args=(item, combo_order['id'], 4)) asy.start() else: unicom_api.change_device_to_activate(item) except Exception as e: logger.info('async_deactivate_expire_package套餐过期停卡异常,{}' 'errLine:{}, errMsg:{}'.format(item, e.__traceback__.tb_lineno, repr(e))) continue @staticmethod def async_combo_sys_msg_push(iccid, combo_order_id, push_type): """ 异步保存消息推送 激活|过期 @param iccid: @param combo_order_id: @param push_type: @return: """ try: now_time = int(time.time()) ud_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid).values('serial_no', 'user_id') push_data = {'combo_order_id': str(combo_order_id), 'serial_no': ud_info_qs.first()['serial_no'], 'status': 0, 'type': push_type, 'updated_time': now_time, 'created_time': now_time, 'user_id': ud_info_qs.first()['user_id']} UnicomFlowPush.objects.create(**push_data) except Exception as e: logger.info('-->出错了~,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def unicom_flow_used(request_dict, response): """ 查询设备每张卡流量使用情况 @param request_dict: @param response: @return: """ page_size = int(request_dict.get('pageSize', 1)) device_count = UnicomDeviceInfo.objects.filter(card_type=0).count() total_pages = device_count // page_size + (device_count % page_size > 0) # 计算总页数 for page_number in range(1, total_pages + 1): u_device_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('id', 'iccid', 'sim_used_flow').order_by( '-created_time')[(page_number - 1) * page_size:page_number * page_size] asy = threading.Thread(target=UnicomComboTaskView.thread_collect_flow_used, args=(u_device_qs,)) asy.start() return response.json(0) @staticmethod def thread_collect_flow_used(u_device_qs): for item in u_device_qs: try: unicom_api = UnicomObjeect() n_time = int(time.time()) # 队列已使用总流量总量 flow_total_usage = unicom_api.get_flow_usage_total(item['iccid']) UnicomDeviceInfo.objects.filter(id=item['id']).update(updated_time=n_time, sim_used_flow=flow_total_usage) except Exception as e: print(repr(e)) continue @classmethod def query_flow_used_history(cls, response): # 获取符合条件的卡片对象查询集,并按创建时间升序排序 card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').order_by('created_time') if not card_qs.exists(): return response.json(0) asy = threading.Thread(target=UnicomComboTaskView.async_bulk_create_usage_history, args=(card_qs,)) asy.start() return response.json(0) @staticmethod def async_bulk_create_usage_history(qs): """ 异步批量创建流量用量历史记录 """ redis_obj = RedisObject() current_time = int(time.time()) # 获取当前时间戳 for item in qs: iccid = item['iccid'] key = 'monthly_flow_' + iccid flow_dict = redis_obj.get_all_hash_data(key) # 获取Redis中指定键的哈希表数据 iot_card_list = [] # 创建一个空的列表,用于批量创建IotCardUsageHistory对象 for k, v in flow_dict.items(): try: cycle = datetime.datetime.strptime(str(k.decode()), '%Y-%m') # 将字符串日期解析为datetime类型 flow = float(v) iot_card_list.append(IotCardUsageHistory( iccid=iccid, card_type=1, cycle=int(cycle.strftime('%Y%m')), # 将日期转换为整数形式,如202201 flow_total_usage=flow, created_time=current_time, updated_time=current_time )) except Exception as e: print(repr(e)) continue # 批量创建IotCardUsageHistory对象 if iot_card_list: IotCardUsageHistory.objects.bulk_create(iot_card_list) @classmethod def query_flow_cache(cls, response): """ 查询流量缓存永久的将设置过期时间为10分钟 """ redis = RedisObject() try: res = redis.get_keys('ASJ:UNICOM:FLOW:*') keys = [key.decode() for key in res] # 进行进一步的处理或打印 for key in keys: ttl = redis.get_ttl(key) if ttl == -1: logger.info('iccidFlow:{}'.format(key)) redis.CONN.expire(key, 60 * 10) return response.json(0) except Exception as e: logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(177, repr(e)) @staticmethod def get_device_usage_history(response): """ 查询联通设备历史用量 """ card_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid') if not card_qs.exists(): return response.json(0) asy = threading.Thread(target=UnicomComboTaskView.async_update_device_usage_history, args=(card_qs,)) asy.start() return response.json(0) @staticmethod def async_update_device_usage_history(qs): """ 异步更新设备用量历史 @param qs: 联通iccid集合 """ try: u_service = UnicomObjeect() iot_card_list = [] now_time = int(time.time()) # 获取当前时间 current_date = datetime.datetime.now() # 计算上个月的时间 last_month_date = current_date - datetime.timedelta(days=current_date.day) # 例格式化日期为 "202307" formatted_date = last_month_date.strftime('%Y%m') for item in qs: params = {'iccid': item['iccid']} result = u_service.query_device_usage_history(**params) res_dict = u_service.get_text_dict(result) if res_dict['code'] == 0 and res_dict['data']: for cycle in res_dict['data']: if cycle['flowTotalUsage'] <= 0 or cycle['cycle'] != int(formatted_date): continue iot_card_list.append(IotCardUsageHistory( iccid=item['iccid'], card_type=1, cycle=cycle['cycle'], flow_total_usage=cycle['flowTotalUsage'], created_time=now_time, updated_time=now_time )) if not iot_card_list: return None # 使用Django的Paginator进行分页 paginator = Paginator(iot_card_list, 300) for page_num in range(1, paginator.num_pages + 1): IotCardUsageHistory.objects.bulk_create(paginator.page(page_num).object_list) except Exception as e: logger.info('查询账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return None @staticmethod def update_device_cycle_flow(response): """ 更新设备账期流量接口 """ card_qs = UnicomDeviceInfo.objects.filter(card_type=0, status=2).values('iccid') if not card_qs.exists(): return response.json(0) logger.info('总数:{}'.format(card_qs.count())) asy = threading.Thread(target=UnicomComboTaskView.async_update_device_cycle_flow, args=(card_qs,)) asy.start() return response.json(0) @staticmethod def async_update_device_cycle_flow(qs): """ 异步更新设备账期流量 """ try: logger.info('进入异步更新设备卡账期流量~~~~') unicom_api = UnicomObjeect() for item in qs: try: unicom_api.get_flow_usage_total(item['iccid']) except Exception as e: print(repr(e)) continue except Exception as e: logger.info('更新账期流量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return None @staticmethod def get_access_number_change_task(response): """ 获取接入号码变更任务 """ task_qs = AccessNumberTaskQueue.objects.exclude(status=1).filter(count__lt=4).order_by('created_time') if not task_qs.exists(): return response.json(0) logger.info('接入卡号变更任务总数:{}'.format(task_qs.count())) asy = threading.Thread(target=UnicomComboTaskView.async_update_access_number_status, args=(task_qs,)) asy.start() return response.json(0) @staticmethod def async_update_access_number_status(qs): """ 异步更新设备账期流量 """ try: logger.info('进入异步更新接入卡号网络状态~~~~') redis = RedisObject() iccid_list = [] for item in qs: try: key = f'ASJ:TELECOM:CHANGE:{item.iccid}' change_data = redis.get_data(key) if change_data: iccid_list.append(item.iccid) continue if item.iccid in iccid_list: # 避免同一ICCID多次执行 continue action_value = 'DEL' if item.action == 2 else 'ADD' result = TelecomObject().single_cut_net(item.access_number, action_value) # 变更网络状态 now_time = int(time.time()) count = item.count + 1 cache_data = {'iccid': item.iccid, 'actionValue': action_value} if not result: # 失败修改执行次数 iccid_list.append(item.iccid) data = {'status': 2, 'count': count, 'updated_time': now_time} AccessNumberTaskQueue.objects.filter(id=item.id).update(**data) redis.CONN.setnx(key, json.dumps(cache_data)) redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态 continue data = {'status': 1, 'count': count, 'completion_time': now_time, 'updated_time': now_time} if result == '-5': # 已符合变更后状态 data['result'] = {'-5': '该号码未订购单独断网功能!;流水号:1000000190202402201640359500'} else: data['result'] = result iccid_list.append(item.iccid) AccessNumberTaskQueue.objects.filter(id=item.id).update(**data) # 成功后修改任务记录状态 redis.CONN.setnx(key, json.dumps(cache_data)) redis.CONN.expire(key, 130) # 130 秒不再变更当前ICCID网络状态 except Exception as e: print(repr(e)) continue except Exception as e: logger.info('异步变更卡网络状态异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def query_total_traffic_today(response): """ 4G订单查询今日总流量消耗 @return: """ asy = threading.Thread(target=UnicomComboTaskView.async_save_today_use_float) asy.start() logger.info('查询4G订单总消耗流量记录') return response.json(0) @staticmethod def async_save_today_use_float(): # 查询正在使用套餐并且大于等于2024年激活 不等于免费测试流量的订单数据 today = datetime.datetime.today() n_data = today.strftime('%Y%m%d') start_time, end_time = LocalDateTimeUtil.get_start_and_end_time(today.strftime('%Y-%m-%d'), '%Y-%m-%d') o_combo_qs = UnicomComboOrderInfo.objects \ .filter(status=1, year__gte=2024, created_time__gte=1714492800, created_time__lt=start_time) \ .exclude(combo__combo_type=4) \ .values('order_id', 'iccid', 'flow_total_usage') try: unicom_api = UnicomObjeect() if not o_combo_qs.exists(): return n_time = int(time.time()) day = n_data[-2:] iccid_list = [] for item in o_combo_qs: iccid = item['iccid'] try: flow = float(unicom_api.get_flow_usage_total(iccid)) if flow == 0: continue old_flow = float(item['flow_total_usage']) if flow == old_flow: continue total_usage = Decimal(flow - old_flow).quantize(Decimal('0.00')) iccid_list.append(IotCardOrderUsageHistory( iccid=iccid, order_id=item['order_id'], card_type=1, cycle=int(n_data), # 将日期转换为整数形式,如202201 cycle_date=int(day), total_traffic=total_usage, created_time=n_time, updated_time=n_time )) if len(iccid_list) >= 300: IotCardOrderUsageHistory.objects.bulk_create(iccid_list) iccid_list = [] except Exception as e: logger.error( '查询日用量异常iccid{},errLine:{}, errMsg:{}'.format(iccid, e.__traceback__.tb_lineno, repr(e))) if iccid_list: IotCardOrderUsageHistory.objects.bulk_create(iccid_list) except Exception as e: logger.error('统计4G卡日用量异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))