# -*- encoding: utf-8 -*- """ @File : UnicomComboTaskController.py @Time : 2022/6/30 16:23 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import datetime import logging import threading import time from decimal import Decimal from django.db import transaction from django.db.models import Q from django.views import View from Model.models import UnicomComboOrderInfo, UnicomCombo, Order_Model, UnicomDeviceInfo, UnicomFlowPush from Object.ResponseObject import ResponseObject from Object.UnicomObject import UnicomObjeect logger = logging.getLogger('info') class UnicomComboTaskView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() print(request) if operation == 'check-activate': return self.check_activate_combo(request_dict, response) elif operation == 'check-flow': return self.check_flow_usage(response) elif operation == 'check-flow-expire': return self.check_flow_expire(response) elif operation == 'check-expire': today = datetime.datetime.today() year = today.year month = today.month self.query_unused_combo_and_activate(request_dict.get('iccid'), year, month, '666') return response.json(0) elif operation == 'updateFlowUsed': # 更新流量使用 self.unicom_flow_used(request_dict, response) return response.json(0) @classmethod def check_activate_combo(cls, request_dict, response): """ 定时检查是否有次月激活套餐 @param request_dict: @param response: @return: """ print(request_dict) logger.info('--->进入监控次月激活联通套餐') now_time = int(time.time()) combo_order_info_qs = UnicomComboOrderInfo.objects.filter(status=0, next_month_activate=True, activation_time__lte=now_time, expire_time__gte=now_time, is_del=0).values() if not combo_order_info_qs.exists(): return response.json(0) try: today = datetime.datetime.today() year = today.year month = today.month with transaction.atomic(): unicom_api = UnicomObjeect() for item in combo_order_info_qs: if item['order_id']: order_id = item['order_id'] order_qs = Order_Model.objects.filter(orderID=order_id, status=1) if not order_qs.exists(): continue combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, iccid=item['iccid']) # 当前已有套餐正在使用则跳出当前循环 if combo_order_qs.exists(): continue combo_id = item['combo_id'] combo_qs = UnicomCombo.objects.filter(id=combo_id).values() if not combo_qs.exists(): continue # 查询当月用量情况 flow_total_usage = unicom_api.get_flow_usage_total(item['iccid']) flow_total_usage = Decimal(flow_total_usage).quantize( Decimal('0.00')) if flow_total_usage > 0 else 0 flow_total_usage = str(flow_total_usage) iccid = item['iccid'] # 检查激活iccid unicom_api.change_device_to_activate(iccid) cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage) logger.info('激活成功,订单编号:{}'.format(order_id)) return response.json(0) except Exception as e: logger.info('出错了~次月激活套餐异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(177, repr(e)) @classmethod def check_flow_usage(cls, response): """ 检查流量使用情况 @return: """ logger.info('--->进入监控流量使用情况') try: unicom_api = UnicomObjeect() combo_order_qs = UnicomComboOrderInfo.objects.filter(status=1, is_del=False, combo__is_unlimited=0).values() if not combo_order_qs.exists(): return response.json(0) today = datetime.datetime.today() year = today.year month = today.month now_time = int(time.time()) for item in combo_order_qs: iccid = item['iccid'] u_device_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid) if not u_device_info_qs.exists(): continue u_device_info_qs = u_device_info_qs.first() activate_usage_flow = float(item['flow_total_usage']) if item['flow_total_usage'] else 0.0 combo_id = item['combo_id'] combo_qs = UnicomCombo.objects.filter(id=combo_id).values() if not combo_qs.exists(): continue combo_qs = combo_qs.first() flow_total = combo_qs['flow_total'] # 队列已使用总流量总量 flow_total_usage = unicom_api.get_flow_usage_total(iccid) is_expire = False flow = activate_usage_flow + flow_total if flow_total_usage > 0: # 初始套餐已使用流量 + 套餐总流量 if flow_total_usage >= flow: is_expire = True usage = (flow_total_usage - activate_usage_flow) if flow_total_usage > activate_usage_flow else 0 cls.flow_warning_push(u_device_info_qs.user_id, u_device_info_qs.serial_no, item['id'], flow_total, usage) # 检查是否有当月未使用套餐 没有则停卡 if is_expire: flow_exceed = flow_total_usage - flow UnicomComboOrderInfo.objects.filter(id=item['id']) \ .update(status=2, updated_time=now_time, flow_exceed=flow_exceed) activate_status = cls.query_unused_combo_and_activate(iccid, year, month, flow_total_usage) logger.info('-->当前卡{}流量已用完,是否有生效套餐:{}'.format(iccid, activate_status)) if not activate_status: # 停用 unicom_api.change_device_to_disable(iccid) return response.json(0) except Exception as e: logger.info('出错了~检测流量用量详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(177, repr(e)) @staticmethod def flow_warning_push(app_user_id, serial_no, combo_order_id, flow_total, flow_usage): """ 监控流量使用大于85%and小于96%进行消息推送提醒 @param app_user_id: app用户id @param serial_no: 序列号 @param combo_order_id: 当前套餐订单id @param flow_total: 套餐流量总量 @param flow_usage: 套餐已使用流量 @return: """ try: if not app_user_id: return False now_time = int(time.time()) push_data = {'combo_order_id': str(combo_order_id), 'serial_no': serial_no, 'flow_total_usage': flow_usage, 'flow_total': flow_total, 'status': 0, 'updated_time': now_time, 'created_time': now_time, 'user_id': app_user_id} if 0 < flow_total and 0 < flow_usage < flow_total: res = flow_usage / flow_total * 100 if 85 < res <= 95: flow_push = UnicomFlowPush.objects.filter(serial_no=serial_no, combo_order_id=combo_order_id) if not flow_push.exists(): UnicomFlowPush.objects.create(**push_data) elif flow_usage >= flow_total: push_data['flow_total_usage'] = flow_total push_data['type'] = 1 UnicomFlowPush.objects.create(**push_data) return True except Exception as e: logger.info('出错了~异常流量监控,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def query_unused_combo_and_activate(iccid, year, month, usage_flow): """ 查询未使用套餐并激活 @param iccid: @param year: @param month: @param usage_flow: @return: """ try: now_time = int(time.time()) combo_order_qs = UnicomComboOrderInfo.objects \ .filter(expire_time__gt=now_time, activation_time__lte=now_time, status=0, iccid=iccid) \ .order_by('created_time') if not combo_order_qs.exists(): return False combo_order = combo_order_qs.first() if not combo_order.order_id: return False order_qs = Order_Model.objects.filter(orderID=combo_order.order_id, status=1) if not order_qs.exists(): return False upd_data = { 'status': 1, 'year': year, 'month': month, 'flow_total_usage': str(usage_flow), 'activation_time': now_time, 'updated_time': now_time, } UnicomComboOrderInfo.objects.filter(id=combo_order.id).update(**upd_data) asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push, args=(iccid, combo_order.id, 3)) asy.start() return True except Exception as e: logger.info('出错了~激活套餐,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False @classmethod def check_flow_expire(cls, response): """ 检查流量到期停卡操作 @param response: @return: """ logger.info('--->进入监控流量到期停卡或激活叠加包') now_time = int(time.time()) combo_order_qs = UnicomComboOrderInfo.objects.filter(~Q(status=2), expire_time__lte=now_time, is_del=False).values() today = datetime.datetime.today() year = today.year month = today.month if not combo_order_qs.exists(): return response.json(0) iccid_list = [] with transaction.atomic(): for item in combo_order_qs: try: icc_id = item['iccid'] um_device_qs = UnicomDeviceInfo.objects.filter(iccid=icc_id) if not um_device_qs.exists(): continue UnicomComboOrderInfo.objects.filter(id=item['id']).update(status=2, updated_time=now_time) iccid_list.append(icc_id) logger.info('--->当前流量套餐已过期,iccid:{}'.format(icc_id)) except Exception as e: logger.info('出错了~监控流量到期异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) continue # set无序不重复元素集 iccid_list = list(set(iccid_list)) unicom_api = UnicomObjeect() for item in iccid_list: activate_combo_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=1, expire_time__gt=now_time, is_del=False).values() if activate_combo_qs.exists(): continue usage_flow = unicom_api.get_flow_usage_total(item) result = cls.query_unused_combo_and_activate(item, year, month, usage_flow) if not result: # 停用设备 unicom_api.change_device_to_disable(item) combo_order_info_qs = UnicomComboOrderInfo.objects.filter(iccid=item, status=2) \ .values('id').order_by('-updated_time') combo_order = combo_order_info_qs.first() asy = threading.Thread(target=UnicomComboTaskView.async_combo_sys_msg_push, args=(icc_id, combo_order['id'], 4)) asy.start() else: unicom_api.change_device_to_activate(item) return response.json(0) @staticmethod def async_combo_sys_msg_push(iccid, combo_order_id, push_type): """ 异步保存消息推送 激活|过期 @param iccid: @param combo_order_id: @param push_type: @return: """ try: now_time = int(time.time()) ud_info_qs = UnicomDeviceInfo.objects.filter(iccid=iccid).values('serial_no', 'user_id') push_data = {'combo_order_id': str(combo_order_id), 'serial_no': ud_info_qs.first()['serial_no'], 'status': 0, 'type': push_type, 'updated_time': now_time, 'created_time': now_time, 'user_id': ud_info_qs.first()['user_id']} UnicomFlowPush.objects.create(**push_data) except Exception as e: logger.info('-->出错了~,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def unicom_flow_used(request_dict, response): """ 查询设备每张卡流量使用情况 @param request_dict: @param response: @return: """ page_size = int(request_dict.get('pageSize', 1)) device_count = UnicomDeviceInfo.objects.filter(card_type=0).count() total_pages = device_count // page_size + (device_count % page_size > 0) # 计算总页数 for page_number in range(1, total_pages + 1): u_device_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('id', 'iccid', 'sim_used_flow').order_by( '-created_time')[(page_number - 1) * page_size:page_number * page_size] asy = threading.Thread(target=UnicomComboTaskView.thread_collect_flow_used, args=(u_device_qs,)) asy.start() return response.json(0) @staticmethod def thread_collect_flow_used(u_device_qs): for item in u_device_qs: try: unicom_api = UnicomObjeect() n_time = int(time.time()) # 队列已使用总流量总量 flow_total_usage = unicom_api.get_flow_usage_total(item['iccid']) UnicomDeviceInfo.objects.filter(id=item['id']).update(updated_time=n_time, sim_used_flow=flow_total_usage) except Exception as e: print(repr(e)) continue