#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # # Copyright (C) 2022 # # @Time : 2022/4/1 11:27 # @Author : ming # @Email : zhangdongming@asj6.wecom.work # @File : CronTaskController.py # @Software: PyCharm import datetime import threading import time import requests from django.db import connection, connections, transaction from django.db.models import Q, Sum, Count from django.views import View from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \ VodHlsModel, ExperienceContextModel, AiService, VodHlsSummary, VideoPlaybackTimeModel, DeviceUserSummary, \ CountryModel, DeviceTypeModel, Lang, UnicomCombo, OrdersSummary, DeviceInfoSummary, CompanySerialModel from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.utils import LocalDateTimeUtil from Service.CommonService import CommonService from Ansjer.config import USED_SERIAL_REDIS_LIST, UNUSED_SERIAL_REDIS_LIST class CronDelDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'delAccessLog': # 定时删除访问接口数据 return self.delAccessLog(response) elif operation == 'delPushInfo': # 定时删除推送数据 return self.delPushInfo(response) elif operation == 'delVodHls': # 定时删除云存播放列表 return self.delVodHls(response) elif operation == 'delCloudLog': # 定时删除云存接口数据 return self.delCloudLog(response) elif operation == 'delTesterDevice': # 定时删除测试账号下的设备数据 return self.delTesterDevice(response) else: return response.json(404) @staticmethod def delAccessLog(response): try: cursor = connection.cursor() # 删除7天前的数据 last_week = LocalDateTimeUtil.get_last_week() sql = 'DELETE FROM access_log WHERE time < %s limit %s' cursor.execute(sql, [last_week, 10000]) # 关闭游标 cursor.close() connection.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delPushInfo(response): now_time = int(time.time()) cursor = connections['mysql02'].cursor() try: # 当前时间转日期 local_date_now = str(datetime.datetime.fromtimestamp(int(now_time)).date()) # 根据日期获取周几 week_val = LocalDateTimeUtil.date_to_week(local_date_now) # 根据当前时间获取7天前时间戳 expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7) # 每次删除条数 size = 10000 # 删除7天前的数据 sql = "DELETE FROM equipment_info WHERE addTime<= %s LIMIT %s " for i in range(6): cursor.execute(sql, [expiration_time, size]) if week_val == 1: sql = "DELETE FROM equipment_info_sunday WHERE add_time<= %s LIMIT %s " if week_val == 2: sql = "DELETE FROM equipment_info_monday WHERE add_time<= %s LIMIT %s " if week_val == 3: sql = "DELETE FROM equipment_info_tuesday WHERE add_time<= %s LIMIT %s " if week_val == 4: sql = "DELETE FROM equipment_info_wednesday WHERE add_time<= %s LIMIT %s " if week_val == 5: sql = "DELETE FROM equipment_info_thursday WHERE add_time<= %s LIMIT %s " if week_val == 6: sql = "DELETE FROM equipment_info_friday WHERE add_time<= %s LIMIT %s " if week_val == 7: sql = "DELETE FROM equipment_info_saturday WHERE add_time<= %s LIMIT %s " for i in range(5): cursor.execute(sql, [expiration_time, size]) # 关闭游标 cursor.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delVodHls(response): nowTime = int(time.time()) try: cursor = connection.cursor() month_ago_time = nowTime - 30 * 24 * 60 * 60 # 删除1个月前的数据 sql = 'DELETE FROM `vod_hls` WHERE endTime<{} LIMIT 50000'.format(month_ago_time) cursor.execute(sql) cursor.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delCloudLog(response): nowTime = int(time.time()) cursor = connection.cursor() try: # 删除3个月前的数据 sql = "DELETE FROM `cloud_log` WHERE time<={} LIMIT 50000".format( nowTime - 3 * 30 * 24 * 60 * 60) cursor.execute(sql) # 关闭游标 cursor.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delTesterDevice(response): try: userID_list = [ 'tech01@ansjer.com', 'tech02@ansjer.com', 'tech03@ansjer.com', 'tech04@ansjer.com', 'tech05@ansjer.com', 'tech06@ansjer.com', 'tech07@ansjer.com', 'tech08@ansjer.com', 'tech09@ansjer.com', 'tech10@ansjer.com', 'fix01@ansjer.com', 'fix02@ansjer.com', 'fix03@ansjer.com', 'fix04@ansjer.com', 'fix05@ansjer.com'] device_user = Device_User.objects.filter(username__in=userID_list) device_info_qs = Device_Info.objects.filter( userID__in=device_user).values('UID') uid_list = [] for device_info in device_info_qs: uid_list.append(device_info['UID']) with transaction.atomic(): # 删除设备云存相关数据 UidSetModel.objects.filter(uid__in=uid_list).delete() UID_Bucket.objects.filter(uid__in=uid_list).delete() Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete() Order_Model.objects.filter(UID__in=uid_list).delete() StsCrdModel.objects.filter(uid__in=uid_list).delete() VodHlsModel.objects.filter(uid__in=uid_list).delete() ExperienceContextModel.objects.filter( uid__in=uid_list).delete() Device_Info.objects.filter(userID__in=device_user).delete() return response.json(0) except Exception as e: return response.json(500, repr(e)) class CronUpdateDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'updateUnusedUidBucket': # 定时更新过期云存关联的未使用套餐状态 return self.updateUnusedUidBucket(response) elif operation == 'updateUnusedAiService': # 定时更新过期ai关联的未使用套餐状态 return self.updateUnusedAiService(response) elif operation == 'reqUpdateSerialStatus': # 定时请求更新序列号状态 return self.reqUpdateSerialStatus(response) elif operation == 'updateSerialStatus': # 更新序列号状态 return self.updateSerialStatus(request_dict, response) else: return response.json(404) @staticmethod def updateUnusedUidBucket(response): """ 监控云存套餐过期修改状态 @param response: @return: """ # 定时更新已过期套餐修改状态为2 now_time = int(time.time()) expired_uid_bucket = UID_Bucket.objects.filter(endTime__lte=now_time) expired_uid_bucket = expired_uid_bucket.filter(~Q(use_status=2)).values('id') if expired_uid_bucket.exists(): expired_uid_bucket.update(use_status=2) # 监控有未使用套餐则自动生效 expired_uid_buckets = \ UID_Bucket.objects.filter(endTime__lte=now_time, has_unused=1).values("id", "uid")[0:1000] for expired_uid_bucket in expired_uid_buckets: unuseds = Unused_Uid_Meal.objects.filter( uid=expired_uid_bucket['uid']).values( "id", "uid", "channel", "addTime", "expire", "num", "bucket_id").order_by('addTime')[0:1] if not unuseds.exists(): continue unused = unuseds[0] try: with transaction.atomic(): count_unused = Unused_Uid_Meal.objects.filter( uid=expired_uid_bucket['uid']).count() has_unused = 1 if count_unused > 1 else 0 endTime = CommonService.calcMonthLater( unused['expire'] * unused['num']) UID_Bucket.objects.filter( uid=expired_uid_bucket['uid']).update( channel=unused['channel'], endTime=endTime, bucket_id=unused['bucket_id'], updateTime=now_time, use_status=1, has_unused=has_unused) Unused_Uid_Meal.objects.filter(id=unused['id']).delete() StsCrdModel.objects.filter( uid=expired_uid_bucket['uid']).delete() # 删除sts记录 except Exception as e: print(repr(e)) continue return response.json(0) @staticmethod def updateUnusedAiService(response): now_time = int(time.time()) ai_service_qs = AiService.objects.filter( endTime__lte=now_time, use_status=1).values( 'id', 'uid')[ 0:200] for ai_service in ai_service_qs: try: with transaction.atomic(): AiService.objects.filter( id=ai_service['id']).update( use_status=2) # 更新过期ai订单状态 # 如果存在未使用套餐,更新为使用 unused_ai_service = AiService.objects.filter( uid=ai_service['uid'], use_status=0).order_by('addTime')[ :1].values( 'id', 'endTime') if unused_ai_service.exists(): # 未使用套餐的endTime在购买的时候保存为有效时间 effective_day = unused_ai_service[0]['endTime'] endTime = now_time + effective_day AiService.objects.filter( id=unused_ai_service[0]['id']).update( use_status=1, endTime=endTime, updTime=now_time) except Exception: continue return response.json(0) @classmethod def reqUpdateSerialStatus(cls, response): redis_obj = RedisObject() # 更新已使用序列号其他服务器的状态 used_serial_redis_list_len = redis_obj.llen(USED_SERIAL_REDIS_LIST) if used_serial_redis_list_len > 0: used_serial_redis_list = [] for i in range(used_serial_redis_list_len): used_serial_redis_list.append(redis_obj.lpop(USED_SERIAL_REDIS_LIST)) request_thread = threading.Thread(target=cls.do_request_thread, args=(str(used_serial_redis_list), 3)) request_thread.start() # 更新未使用序列号其他服务器的状态 unused_serial_redis_list_len = redis_obj.llen(UNUSED_SERIAL_REDIS_LIST) if unused_serial_redis_list_len > 0: unused_serial_redis_list = [] for i in range(unused_serial_redis_list_len): unused_serial_redis_list.append(redis_obj.lpop(UNUSED_SERIAL_REDIS_LIST)) request_thread = threading.Thread(target=cls.do_request_thread, args=(str(unused_serial_redis_list), 1)) request_thread.start() return response.json(0) @staticmethod def do_request_thread(serial_redis_list, status): """ 请求更新序列号线程 @param serial_redis_list: 序列号redis列表 @param status: 状态, 1: 未使用, 3: 已占用 """ data = { 'serial_redis_list': serial_redis_list, 'status': status } # 确认域名列表 orders_domain_name_list = CommonService.get_orders_domain_name_list() for domain_name in orders_domain_name_list: url = '{}cron/update/updateSerialStatus'.format(domain_name) requests.post(url=url, data=data, timeout=2) @staticmethod def updateSerialStatus(request_dict, response): """ 更新序列号状态 @param request_dict: 请求参数 @request_dict serial_redis_list: 序列号redis列表 @request_dict status: 状态, 1: 未使用, 3: 已占用 @param response: 响应对象 """ serial_redis_list = request_dict.get('serial_redis_list', None) status = request_dict.get('status', None) if not all([serial_redis_list, status]): return response.json(444) serial_redis_list = eval(serial_redis_list) CompanySerialModel.objects.filter(serial_number__in=serial_redis_list).update(status=int(status)) class CronCollectDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'collectPlayBack': # 定时保存云存视频回放 return self.collect_play_back(response) elif operation == 'collectDeviceUser': # 定时保存用户数据 return self.collect_device_user(response) elif operation == 'collectOrder': # 定时保存订单数据 return self.collect_order(response) elif operation == 'collectDeviceInfo': # 定时保存设备数据 return self.collect_device_info(response) else: return response.json(404) @staticmethod def collect_play_back(response): try: now_time = int(time.time()) today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, today.day) end_time = start_time + datetime.timedelta(days=1) start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')) this_month_str = datetime.datetime(today.year, today.month, 1) this_month_stamp = CommonService.str_to_timestamp(this_month_str.strftime('%Y-%m-%d %H:%M:%S')) video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time, startTime__lt=end_time, playMode='cloud').values('uid').annotate( play_duration=Sum('duration'), play_frequency=Count('uid')) with transaction.atomic(): for item in video_play_back_time_qs: vod_hls_summary_qs = VodHlsSummary.objects.filter(uid=item['uid'], time=this_month_stamp) if vod_hls_summary_qs.exists(): vod_hls_summary = vod_hls_summary_qs.first() vod_hls_summary.play_duration += item['play_duration'] vod_hls_summary.play_frequency += 1 vod_hls_summary.updated_time = now_time vod_hls_summary.save() else: VodHlsSummary.objects.create(uid=item['uid'], time=this_month_stamp, created_time=now_time, play_duration=item['play_duration'], play_frequency=1, updated_time=now_time) return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def collect_device_user(response): try: created_time = int(time.time()) today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, 23) end_time = start_time + datetime.timedelta(days=1) increase_user_qs = Device_User.objects.filter(data_joined__gte=start_time, data_joined__lt=end_time).values( 'data_joined', 'region_country') active_user_qs = Device_User.objects.filter(last_login__gte=start_time, last_login__lt=end_time).values( 'last_login', 'region_country') start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) country_qs = CountryModel.objects.all().values('id', 'region__name', 'country_name') country_dict = {} continent_dict = {} for item in country_qs: country_dict[item['id']] = item['country_name'] continent_dict[item['country_name']] = item['region__name'] with transaction.atomic(): if increase_user_qs.exists(): increase_user_count = increase_user_qs.count() increase_user_country_list = increase_user_qs.values('region_country').annotate( count=Count('region_country')).order_by('count') increase_user_country_dict = {} increase_user_continent_dict = {} for item in increase_user_country_list: country_name = country_dict.get(item['region_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') increase_user_country_dict[country_name] = item['count'] if continent_name not in increase_user_continent_dict: increase_user_continent_dict[continent_name] = 0 increase_user_continent_dict[continent_name] += item['count'] DeviceUserSummary.objects.create(time=start_time, count=increase_user_count, country=increase_user_country_dict, created_time=created_time, continent=increase_user_continent_dict) if active_user_qs.exists(): active_user_count = active_user_qs.count() active_user_country_list = active_user_qs.values('region_country').annotate( count=Count('region_country')).order_by( 'count') active_user_country_dict = {} active_user_continent_dict = {} for item in active_user_country_list: country_name = country_dict.get(item['region_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') active_user_country_dict[country_name] = item['count'] if continent_name not in active_user_continent_dict: active_user_continent_dict[continent_name] = 0 active_user_continent_dict[continent_name] += item['count'] DeviceUserSummary.objects.create(time=start_time, query_type=1, count=active_user_count, country=active_user_country_dict, created_time=created_time, continent=active_user_continent_dict) return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def collect_order(response): try: created_time = int(time.time()) today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, today.day) end_time = start_time + datetime.timedelta(days=1) start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')) order_qs = Order_Model.objects.filter(addTime__gte=start_time, addTime__lt=end_time, status=1).values('UID', 'order_type', 'store_meal_name', 'price', 'addTime', 'currency').order_by( 'addTime') uid_list = [] all_order_qs = Order_Model.objects.filter(addTime__lt=start_time, status=1).values('UID') for item in all_order_qs: if item['UID'] not in uid_list: uid_list.append(item['UID']) # 国家表数据 country_qs = CountryModel.objects.values('id', 'country_name') country_dict = {} for item in country_qs: country_dict[item['id']] = item['country_name'] # 设备类型数据 device_type_qs = DeviceTypeModel.objects.values('name', 'type') device_type_dict = {} for item in device_type_qs: device_type_dict[item['type']] = item['name'] with transaction.atomic(): for item in order_qs: is_pay = 0 price = float(item['price']) currency = item['currency'] uid_set_qs = UidSetModel.objects.filter(uid=item['UID']).values('tb_country') country_id = uid_set_qs[0]['tb_country'] if uid_set_qs.exists() else 0 country_name = country_dict.get(country_id, '未知国家') order_type = item['order_type'] device_info_qs = Device_Info.objects.filter(UID=item['UID']).values('Type') device_type_id = device_info_qs[0]['Type'] if device_info_qs.exists() else 0 device_type_name = device_type_dict.get(device_type_id, '未知设备') store_meal_name = item['store_meal_name'] add_time_stamp = item['addTime'] add_time_str = datetime.datetime.fromtimestamp(int(add_time_stamp)) add_time_str = datetime.datetime(add_time_str.year, add_time_str.month, add_time_str.day) add_time_stamp = CommonService.str_to_timestamp(add_time_str.strftime('%Y-%m-%d %H:%M:%S')) if price == 0: is_pay = 1 order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=1, service_type=order_type) else: order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=0, service_type=order_type) if item['UID'] not in uid_list: pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=2, service_type=order_type) query_type = 2 else: pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=3, service_type=order_type) query_type = 3 if pay_order_summary_qs.exists(): pay_order_summary = pay_order_summary_qs.first() pay_order_summary.count += 1 temp_total = eval(pay_order_summary.total) if currency not in temp_total: temp_total[currency] = price else: temp_total[currency] = round(temp_total[currency] + price, 2) pay_order_summary.total = temp_total country_temp_dict = eval(pay_order_summary.country) if country_name in country_temp_dict: country_temp_dict[country_name]['数量'] += 1 if currency not in country_temp_dict[country_name]: country_temp_dict[country_name][currency] = price else: country_temp_dict[country_name][currency] = round( country_temp_dict[country_name][currency] + price, 2) else: country_temp_dict[country_name] = {'数量': 1, currency: price} pay_order_summary.country = country_temp_dict device_type_temp_dict = eval(pay_order_summary.device_type) if device_type_name in device_type_temp_dict: device_type_temp_dict[device_type_name]['数量'] += 1 if currency not in device_type_temp_dict[device_type_name]: device_type_temp_dict[device_type_name][currency] = price else: device_type_temp_dict[device_type_name][currency] = round( device_type_temp_dict[device_type_name][currency] + price, 2) else: device_type_temp_dict[device_type_name] = {'数量': 1, currency: price} pay_order_summary.device_type = device_type_temp_dict store_meal_temp_dict = eval(pay_order_summary.store_meal) if store_meal_name in store_meal_temp_dict: store_meal_temp_dict[store_meal_name]['数量'] += 1 if currency not in store_meal_temp_dict[store_meal_name]: store_meal_temp_dict[store_meal_name][currency] = price else: store_meal_temp_dict[store_meal_name][currency] = round( store_meal_temp_dict[store_meal_name][currency] + price, 2) else: store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price} pay_order_summary.store_meal = store_meal_temp_dict pay_order_summary.save() else: final_total = {currency: price} country_temp_dict = { country_name: { '数量': 1, currency: price } } device_type_temp_dict = { device_type_name: { '数量': 1, currency: price } } store_meal_temp_dict = { store_meal_name: { '数量': 1, currency: price } } OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=query_type, service_type=order_type, total=final_total, country=country_temp_dict, created_time=created_time, device_type=device_type_temp_dict, store_meal=store_meal_temp_dict) if order_summary_qs.exists(): order_summary = order_summary_qs.first() order_summary.count += 1 temp_total = eval(order_summary.total) if currency not in temp_total: temp_total[currency] = price else: temp_total[currency] = round(temp_total[currency] + price, 2) order_summary.total = temp_total country_temp_dict = eval(order_summary.country) if country_name in country_temp_dict: if is_pay == 0: country_temp_dict[country_name]['数量'] += 1 if currency not in country_temp_dict[country_name]: country_temp_dict[country_name][currency] = price else: country_temp_dict[country_name][currency] = round( country_temp_dict[country_name][currency] + price, 2) else: country_temp_dict[country_name] += 1 else: if is_pay == 0: country_temp_dict[country_name] = {'数量': 1, currency: price} else: country_temp_dict[country_name] = 1 order_summary.country = country_temp_dict device_type_temp_dict = eval(order_summary.device_type) if device_type_name in device_type_temp_dict: if is_pay == 0: device_type_temp_dict[device_type_name]['数量'] += 1 if currency not in device_type_temp_dict[device_type_name]: device_type_temp_dict[device_type_name][currency] = price else: device_type_temp_dict[device_type_name][currency] = round( device_type_temp_dict[device_type_name][currency] + price, 2) else: device_type_temp_dict[device_type_name] += 1 else: if is_pay == 0: device_type_temp_dict[device_type_name] = {'数量': 1, currency: price} else: device_type_temp_dict[device_type_name] = 1 order_summary.device_type = device_type_temp_dict store_meal_temp_dict = eval(order_summary.store_meal) if store_meal_name in store_meal_temp_dict: if is_pay == 0: store_meal_temp_dict[store_meal_name]['数量'] += 1 if currency not in store_meal_temp_dict[store_meal_name]: store_meal_temp_dict[store_meal_name][currency] = price else: store_meal_temp_dict[store_meal_name][currency] = round( store_meal_temp_dict[store_meal_name][currency] + price, 2) else: store_meal_temp_dict[store_meal_name] += 1 else: if is_pay == 0: store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price} else: store_meal_temp_dict[store_meal_name] = 1 order_summary.store_meal = store_meal_temp_dict order_summary.save() else: final_total = {currency: price} if is_pay == 0: country_temp_dict = { country_name: { '数量': 1, currency: price } } device_type_temp_dict = { device_type_name: { '数量': 1, currency: price } } store_meal_temp_dict = { store_meal_name: { '数量': 1, currency: price } } else: device_type_temp_dict = { device_type_name: 1 } store_meal_temp_dict = { store_meal_name: 1 } country_temp_dict = { country_name: 1 } OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=is_pay, service_type=order_type, total=final_total, country=country_temp_dict, created_time=created_time, device_type=device_type_temp_dict, store_meal=store_meal_temp_dict) return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def collect_device_info(response): try: created_time = int(time.time()) today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, today.day) end_time = start_time + datetime.timedelta(days=1) start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')) increase_device_qs = UidSetModel.objects.filter(addTime__gte=start_time, addTime__lt=end_time).values( 'tb_country', 'uid', 'device_type', 'cloud_vod', 'is_ai', 'mobile_4g', 'addTime') video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time, startTime__lt=end_time).values('uid') active_device_qs = UidSetModel.objects.filter(uid__in=video_play_back_time_qs).values('tb_country', 'addTime', 'device_type', 'cloud_vod', 'is_ai', 'mobile_4g', 'uid') increase_device_count = increase_device_qs.count() active_device_count = active_device_qs.count() # 国家表数据 country_qs = CountryModel.objects.values('id', 'country_name', 'region__name') country_dict = {} continent_dict = {} for item in country_qs: country_dict[item['id']] = item['country_name'] continent_dict[item['country_name']] = item['region__name'] # 设备类型数据 device_type_qs = DeviceTypeModel.objects.values('name', 'type') device_type_dict = {} for item in device_type_qs: device_type_dict[item['type']] = item['name'] with transaction.atomic(): if increase_device_qs.exists(): # 国家大洲设备数据 increase_device_country_list = increase_device_qs.values('tb_country').annotate( count=Count('tb_country')).order_by('count') increase_device_country_dict = {} increase_device_continent_dict = {} for item in increase_device_country_list: country_name = country_dict.get(item['tb_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') increase_device_country_dict[country_name] = item['count'] if continent_name not in increase_device_continent_dict: increase_device_continent_dict[continent_name] = 0 increase_device_continent_dict[continent_name] += item['count'] # 设备类型数据 increase_device_type_list = increase_device_qs.values('device_type').annotate( count=Count('device_type')).order_by('count') increase_device_type_dict = {} for item in increase_device_type_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') increase_device_type_dict[type_name] = item['count'] # 云存设备类型数据 increase_device_vod_list = increase_device_qs.filter(~Q(cloud_vod=2)).values( 'device_type').annotate( count=Count('device_type')).order_by('count') increase_device_vod_dict = {} for item in increase_device_vod_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') increase_device_vod_dict[type_name] = item['count'] # AI设备类型数据 increase_device_ai_list = increase_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate( count=Count('device_type')).order_by('count') increase_device_ai_dict = {} for item in increase_device_ai_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') increase_device_ai_dict[type_name] = item['count'] # 联通设备类型数据 increase_device_unicom_list = increase_device_qs.filter(~Q(mobile_4g=2)).values( 'device_type').annotate( count=Count('device_type')).order_by('count') increase_device_unicom_dict = {} for item in increase_device_unicom_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') increase_device_unicom_dict[type_name] = item['count'] DeviceInfoSummary.objects.create(time=start_time, count=increase_device_count, query_type=0, created_time=created_time, country=increase_device_country_dict, continent=increase_device_continent_dict, vod_service=increase_device_vod_dict, ai_service=increase_device_ai_dict, unicom_service=increase_device_unicom_dict, device_type=increase_device_type_dict) if active_device_qs.exists(): # 国家大洲设备数据 active_device_country_list = active_device_qs.values('tb_country').annotate( count=Count('tb_country')).order_by('count') active_device_country_dict = {} active_device_continent_dict = {} for item in active_device_country_list: country_name = country_dict.get(item['tb_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') active_device_country_dict[country_name] = item['count'] if continent_name not in active_device_continent_dict: active_device_continent_dict[continent_name] = 0 active_device_continent_dict[continent_name] += item['count'] # 设备类型数据 active_device_type_list = active_device_qs.values('device_type').annotate( count=Count('device_type')).order_by('count') active_device_type_dict = {} for item in active_device_type_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') active_device_type_dict[type_name] = item['count'] # 云存设备类型数据 active_device_vod_list = active_device_qs.filter(~Q(cloud_vod=2)).values('device_type').annotate( count=Count('device_type')).order_by('count') active_device_vod_dict = {} for item in active_device_vod_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') active_device_vod_dict[type_name] = item['count'] # AI设备类型数据 active_device_ai_list = active_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate( count=Count('device_type')).order_by('count') active_device_ai_dict = {} for item in active_device_ai_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') active_device_ai_dict[type_name] = item['count'] # 联通设备类型数据 active_device_unicom_list = active_device_qs.filter(~Q(mobile_4g=2)).values('device_type').annotate( count=Count('device_type')).order_by('count') active_device_unicom_dict = {} for item in active_device_unicom_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') active_device_unicom_dict[type_name] = item['count'] DeviceInfoSummary.objects.create(time=start_time, count=active_device_count, query_type=1, created_time=created_time, country=active_device_country_dict, continent=active_device_continent_dict, vod_service=active_device_vod_dict, ai_service=active_device_ai_dict, unicom_service=active_device_unicom_dict, device_type=active_device_type_dict) return response.json(0) except Exception as e: return response.json(500, repr(e))