#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # # Copyright (C) 2022 # # @Time : 2022/4/1 11:27 # @Author : ming # @Email : zhangdongming@asj6.wecom.work # @File : CronTaskController.py # @Software: PyCharm import datetime import threading import time import requests from django.db import connection, connections, transaction from django.db.models import Q, Sum, Count from django.views import View from Ansjer.config import USED_SERIAL_REDIS_LIST, UNUSED_SERIAL_REDIS_LIST, CONFIG_INFO, CONFIG_US, \ RESET_REGION_ID_SERIAL_REDIS_LIST, LOGGER from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \ VodHlsModel, ExperienceContextModel, AiService, VodHlsSummary, VideoPlaybackTimeModel, DeviceUserSummary, \ CountryModel, DeviceTypeModel, OrdersSummary, DeviceInfoSummary, CompanySerialModel, \ CloudLogModel, UidCloudStorageCount, UserExModel, DeviceDomainRegionModel, VodHlsTag, VodHlsTagType, IcloudService, \ Store_Meal, Lang, VodBucketModel, UnicomComboOrderInfo, UnicomDeviceInfo from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.utils import LocalDateTimeUtil from Service.CommonService import CommonService from Service.VodHlsService import SplitVodHlsObject from Object.UnicomObject import UnicomObjeect class CronDelDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'delAccessLog': # 定时删除访问接口数据 return self.delAccessLog(response) elif operation == 'delPushInfo': # 定时删除推送数据 return self.delPushInfo(response) elif operation == 'delVodHls': # 定时删除云存播放列表 return self.delVodHls(response) elif operation == 'delCloudLog': # 定时删除云存接口数据 return self.delCloudLog(response) elif operation == 'delTesterDevice': # 定时删除测试账号下的设备数据 return self.delTesterDevice(response) elif operation == 'delAppLog': # 定时删除app日志 return self.delAppLog(response) elif operation == 'UpdateConfiguration': # 定时更新配置 return self.UpdateConfiguration(response) elif operation == 'cloud-log': return self.uid_cloud_storage_upload_count(response) elif operation == 'delDeviceLog': # 定时删除设备日志 return self.del_device_log(response) else: return response.json(404) @staticmethod def UpdateConfiguration(response): """ 定时更新配置 @param response: 响应对象 @return: """ try: ucode_list = ['823C01552AA', '823C01550AA', '823C01550XA', '823C01850XA', '730201350AA', '730201350AA', '730201450AA', '730201450MA', '72V201252AA', '72V201253AA', '72V201353AA', '72V201354AA', '72V201355AA', '72V201254AA'] UidSetModel.objects.filter(ucode__in=ucode_list, is_human=0).update(is_human=1) UidSetModel.objects.filter(ucode='72V201254AA', mobile_4g=0).update(mobile_4g=1) # 根据设备规格码定时更新默认算法类型类型 ucode_list = ['823C01552AA', '823C01550AA', '823C01550XA', 'C18201550KA', '823C01550TA', '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA', '823C01850VA'] UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=47) ucode_list = ['730201350AA', '730201450AA', '730201450MA', '730201450NA'] UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=7) # 根据设备规格码更新默认个性化语音值 ucode_list = ['823C01552AA', '823C01550XA', 'C18201550KA', '823C01550TA', '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA', '823C01850VA', '730201450AA', '730201450MA', '730201450NA', '72V201252AA', '72V201253AA', '72V201353AA', '72V201354AA', '72V201355AA', '72V201254AA'] UidSetModel.objects.filter(ucode__in=ucode_list, is_custom_voice=0).update(is_custom_voice=1) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def delAppLog(response): """ 定时删除app日志 @param response: 响应对象 @return: """ nowTime = int(time.time()) try: cursor = connection.cursor() month_ago_time = nowTime - 30 * 24 * 60 * 60 # 保留近30天的数据 sql = 'DELETE FROM `app_log` WHERE add_time<{}'.format(month_ago_time) cursor.execute(sql) cursor.close() return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def uid_cloud_storage_upload_count(response): try: now_time = int(time.time()) local_time = LocalDateTimeUtil.get_before_days_timestamp(now_time) format_str = '%Y-%m-%d' date_str = LocalDateTimeUtil.time_stamp_to_time(local_time, format_str) start_time, end_time = LocalDateTimeUtil.get_start_and_end_time(date_str, format_str) cs_uid_qs = UID_Bucket.objects.filter(addTime__gte=int(1669824000)).values('uid') if not cs_uid_qs.exists(): return response.json(0) for item in cs_uid_qs: uid = item['uid'] cloud_log_qs = CloudLogModel.objects.filter(uid=uid, operation=r'cloudstorage/storeplaylist', time__gte=start_time, time__lte=end_time) cloud_log_qs = cloud_log_qs.values('uid')[0:1] if not cloud_log_qs.exists(): continue count_data = {'uid': uid, 'count': cloud_log_qs.count(), 'created_time': end_time, 'updated_time': end_time} UidCloudStorageCount.objects.create(**count_data) return response.json(0) except Exception as e: LOGGER.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500) @staticmethod def delAccessLog(response): try: cursor = connection.cursor() # 删除7天前的数据 last_week = LocalDateTimeUtil.get_last_week() sql = 'DELETE FROM access_log WHERE time < %s limit %s' cursor.execute(sql, [last_week, 10000]) # 关闭游标 cursor.close() connection.close() return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def delPushInfo(response): now_time = int(time.time()) cursor = connections['mysql02'].cursor() try: # 当前时间转日期 local_date_now = str(datetime.datetime.fromtimestamp(int(now_time)).date()) # 根据日期获取周几 week_val = LocalDateTimeUtil.date_to_week(local_date_now) # 根据当前时间获取7天前时间戳 expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7) # 每次删除条数 size = 10000 # 删除7天前的数据 sql = "DELETE FROM equipment_info WHERE addTime<= %s LIMIT %s " for i in range(6): cursor.execute(sql, [expiration_time, size]) if week_val == 1: sql = "DELETE FROM equipment_info_sunday WHERE add_time<= %s LIMIT %s " if week_val == 2: sql = "DELETE FROM equipment_info_monday WHERE add_time<= %s LIMIT %s " if week_val == 3: sql = "DELETE FROM equipment_info_tuesday WHERE add_time<= %s LIMIT %s " if week_val == 4: sql = "DELETE FROM equipment_info_wednesday WHERE add_time<= %s LIMIT %s " if week_val == 5: sql = "DELETE FROM equipment_info_thursday WHERE add_time<= %s LIMIT %s " if week_val == 6: sql = "DELETE FROM equipment_info_friday WHERE add_time<= %s LIMIT %s " if week_val == 7: sql = "DELETE FROM equipment_info_saturday WHERE add_time<= %s LIMIT %s " for i in range(5): cursor.execute(sql, [expiration_time, size]) # 关闭游标 cursor.close() return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def delVodHls(response): nowTime = int(time.time()) try: CronDelDataView.del_vod_hls_tag() cursor = connection.cursor() month_ago_time = nowTime - 3 * 30 * 24 * 60 * 60 # 删除3个月前的数据 sql = 'DELETE FROM `vod_hls` WHERE endTime<{} LIMIT 50000'.format(month_ago_time) cursor.execute(sql) cursor.close() # 删除vod_hls分表数据 split_vod_hls_obj = SplitVodHlsObject() split_vod_hls_obj.del_vod_hls_data(end_time__lt=month_ago_time) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def del_vod_hls_tag(): """ 删除AI标签记录 """ e_time = LocalDateTimeUtil.get_before_days_timestamp(int(time.time()), 30) VodHlsTagType.objects.filter(created_time__lt=e_time).delete() VodHlsTag.objects.filter(created_time__lt=e_time).delete() @staticmethod def delCloudLog(response): nowTime = int(time.time()) cursor = connection.cursor() try: # 删除3个月前的数据 sql = "DELETE FROM `cloud_log` WHERE time<={} LIMIT 50000".format( nowTime - 3 * 30 * 24 * 60 * 60) cursor.execute(sql) # 关闭游标 cursor.close() return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def delTesterDevice(response): try: userID_list = [ 'tech01@ansjer.com', 'tech02@ansjer.com', 'tech03@ansjer.com', 'tech04@ansjer.com', 'tech05@ansjer.com', 'tech06@ansjer.com', 'tech07@ansjer.com', 'tech08@ansjer.com', 'tech09@ansjer.com', 'tech10@ansjer.com', 'fix01@ansjer.com', 'fix02@ansjer.com', 'fix03@ansjer.com', 'fix04@ansjer.com', 'fix05@ansjer.com'] device_user = Device_User.objects.filter(username__in=userID_list) device_info_qs = Device_Info.objects.filter( userID__in=device_user).values('UID') uid_list = [] for device_info in device_info_qs: uid_list.append(device_info['UID']) with transaction.atomic(): # 删除设备云存相关数据 UidSetModel.objects.filter(uid__in=uid_list).delete() UID_Bucket.objects.filter(uid__in=uid_list).delete() Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete() Order_Model.objects.filter(UID__in=uid_list).delete() StsCrdModel.objects.filter(uid__in=uid_list).delete() VodHlsModel.objects.filter(uid__in=uid_list).delete() # 删除vod_hls分表数据 split_vod_hls_obj = SplitVodHlsObject() split_vod_hls_obj.del_vod_hls_data(uid__in=uid_list) ExperienceContextModel.objects.filter(uid__in=uid_list).delete() Device_Info.objects.filter(userID__in=device_user).delete() return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def del_device_log(response): """ 定时删除设备日志 @param response: 响应对象 @return: """ nowTime = int(time.time()) try: cursor = connection.cursor() month_ago_time = nowTime - 30 * 24 * 60 * 60 # 保留近30天的数据 sql = 'DELETE FROM `device_log` WHERE unix_timestamp(add_time)<{}'.format(month_ago_time) cursor.execute(sql) cursor.close() return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) class CronUpdateDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'updateUnusedUidBucket': # 定时更新过期云存关联的未使用套餐状态 return self.updateUnusedUidBucket(response) elif operation == 'updateUnusedAiService': # 定时更新过期ai关联的未使用套餐状态 return self.updateUnusedAiService(response) elif operation == 'updateIcloudService': # 定时更新过期云盘套餐使用状态 return self.updateIcloudService(response) elif operation == 'reqUpdateSerialStatus': # 定时请求更新序列号状态 return self.reqUpdateSerialStatus(response) elif operation == 'updateSerialStatus': # 更新序列号状态 return self.updateSerialStatus(request_dict, response) elif operation == 'reset-region-id': # 重置地区id return self.reset_region_id(request_dict, response) elif operation == 'updateExperienceMeal': # 定时修改体验套餐有效期为1个月 return self.update_experience_meal(request_dict, response) else: return response.json(404) @staticmethod def updateUnusedUidBucket(response): """ 监控云存套餐过期修改状态 @param response: @return: """ # 定时更新已过期套餐修改状态为2 now_time = int(time.time()) expired_uid_bucket = UID_Bucket.objects.filter(endTime__lte=now_time) expired_uid_bucket = expired_uid_bucket.filter(~Q(use_status=2)).values('id') if expired_uid_bucket.exists(): expired_uid_bucket.update(use_status=2) # 监控有未使用套餐则自动生效 expired_uid_buckets = UID_Bucket.objects.filter(endTime__lte=now_time, has_unused=1).values("id", "uid")[0:1000] for expired_uid_bucket in expired_uid_buckets: unuseds = Unused_Uid_Meal.objects.filter( uid=expired_uid_bucket['uid']).values( "id", "uid", "channel", "addTime", "expire", "num", "bucket_id").order_by('addTime') if not unuseds.exists(): continue unused = unuseds[0] try: with transaction.atomic(): count_unused = Unused_Uid_Meal.objects.filter(uid=expired_uid_bucket['uid']).count() has_unused = 1 if count_unused > 1 else 0 end_time = CommonService.calcMonthLater(unused['expire'] * unused['num']) UID_Bucket.objects.filter( uid=expired_uid_bucket['uid']).update( channel=unused['channel'], endTime=end_time, bucket_id=unused['bucket_id'], updateTime=now_time, use_status=1, has_unused=has_unused) Unused_Uid_Meal.objects.filter(id=unused['id']).delete() StsCrdModel.objects.filter(uid=expired_uid_bucket['uid']).delete() # 删除sts记录 except Exception as e: print(repr(e)) continue return response.json(0) @staticmethod def updateUnusedAiService(response): now_time = int(time.time()) ai_service_qs = AiService.objects.filter( endTime__lte=now_time, use_status=1).values( 'id', 'uid')[ 0:200] for ai_service in ai_service_qs: try: with transaction.atomic(): AiService.objects.filter( id=ai_service['id']).update( use_status=2) # 更新过期ai订单状态 # 如果存在未使用套餐,更新为使用 unused_ai_service = AiService.objects.filter( uid=ai_service['uid'], use_status=0).order_by('addTime')[ :1].values( 'id', 'endTime') if unused_ai_service.exists(): # 未使用套餐的endTime在购买的时候保存为有效时间 effective_day = unused_ai_service[0]['endTime'] endTime = now_time + effective_day AiService.objects.filter( id=unused_ai_service[0]['id']).update( use_status=1, endTime=endTime, updTime=now_time) except Exception: continue return response.json(0) @staticmethod def updateIcloudService(response): """ 监控云盘套餐过期修改状态 @param response: @return: """ # 定时更新已过期套餐修改状态为2 now_time = int(time.time()) try: IcloudService.objects.filter(Q(end_time__lte=now_time), ~Q(end_time=0), ~Q(use_status=1)).update(use_status=1) return response.json(0) except Exception as e: return response.json(500) @classmethod def reqUpdateSerialStatus(cls, response): redis_obj = RedisObject() # 更新已使用序列号其他服务器的状态 used_serial_redis_list = redis_obj.lrange(USED_SERIAL_REDIS_LIST, 0, -1) # 读取redis已使用序列号 if used_serial_redis_list: LOGGER.info('---请求更新已使用序列号列表---used_serial_redis_list:{}'.format(used_serial_redis_list)) used_serial_redis_list = [str(i, 'utf-8') for i in used_serial_redis_list] cls.do_request_function(used_serial_redis_list, 3) # 更新未使用序列号其他服务器的状态 unused_serial_redis_list = redis_obj.lrange(UNUSED_SERIAL_REDIS_LIST, 0, -1) # 读取redis未使用序列号 if unused_serial_redis_list: LOGGER.info('---请求更新未使用序列号列表---unused_serial_redis_list:{}'.format(unused_serial_redis_list)) unused_serial_redis_list = [str(i, 'utf-8') for i in unused_serial_redis_list] cls.do_request_function(unused_serial_redis_list, 1) # 重置地区id reset_region_id_serial_redis_list = redis_obj.lrange(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, -1) # 读取redis未使用序列号 if reset_region_id_serial_redis_list: LOGGER.info('---请求重置地区id的序列号列表---:{}'.format(reset_region_id_serial_redis_list)) reset_region_id_serial_redis_list = [str(i, 'utf-8') for i in reset_region_id_serial_redis_list] cls.do_request_reset_region_id(reset_region_id_serial_redis_list) return response.json(0) @staticmethod def do_request_function(serial_redis_list, status): """ 请求更新序列号状态 @param serial_redis_list: 序列号redis列表 @param status: 状态, 1: 未使用, 3: 已占用 """ data = { 'serial_redis_list': str(serial_redis_list), 'status': status } # 确认域名列表 orders_domain_name_list = CommonService.get_orders_domain_name_list() redis_obj = RedisObject() LOGGER.info('---请求更新序列号线程---data:{},orders_domain_name_list:{}'.format(data, orders_domain_name_list)) try: requests_failed_flag = False # 请求失败标志位 for domain_name in orders_domain_name_list: url = '{}cron/update/updateSerialStatus'.format(domain_name) response = requests.post(url=url, data=data, timeout=5) LOGGER.info('---请求更新序列号响应时间---:{}'.format(response.elapsed.total_seconds())) result = response.json() if result['result_code'] != 0: # 请求失败标志位置位 requests_failed_flag = True break # 状态为未使用,重置美洲服的地区id if status == 1: # 美洲服直接更新 if CONFIG_INFO == CONFIG_US: DeviceDomainRegionModel.objects.filter(~Q(region_id=0), serial_number__in=serial_redis_list). \ update(region_id=0) else: # 其他服请求到美洲服更新 req_url = 'https://www.dvema.com/cron/update/reset-region-id' req_data = { 'serial_redis_list': str(serial_redis_list) } response = requests.post(url=req_url, data=req_data, timeout=5) LOGGER.info('---请求重置地区id响应时间---:{}'.format(response.elapsed.total_seconds())) result = response.json() if result['result_code'] != 0: # 请求失败标志位置位 requests_failed_flag = True break if not requests_failed_flag: # 请求成功删除redis序列号 if status == 1: for i in serial_redis_list: redis_obj.lrem(UNUSED_SERIAL_REDIS_LIST, 0, i) elif status == 3: for i in serial_redis_list: redis_obj.lrem(USED_SERIAL_REDIS_LIST, 0, i) except Exception as e: LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e))) @staticmethod def do_request_reset_region_id(reset_region_id_serial_redis_list): """ 请求重置地区id @param reset_region_id_serial_redis_list: 序列号redis列表 """ redis_obj = RedisObject() requests_failed_flag = False # 请求失败标志位 data = { 'serial_redis_list': str(reset_region_id_serial_redis_list), } url = 'https://www.dvema.com/cron/update/reset-region-id' try: response = requests.post(url=url, data=data, timeout=5) result = response.json() if result['result_code'] != 0: # 请求失败标志位置位 requests_failed_flag = True if not requests_failed_flag: # 请求成功删除redis序列号 for serial in reset_region_id_serial_redis_list: redis_obj.lrem(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, serial) except Exception as e: LOGGER.info('---请求重置地区id异常---:{}'.format(repr(e))) @staticmethod def updateSerialStatus(request_dict, response): """ 更新序列号状态 @param request_dict: 请求参数 @request_dict serial_redis_list: 序列号redis列表 @request_dict status: 状态, 1: 未使用, 3: 已占用 @param response: 响应对象 """ serial_redis_list = request_dict.get('serial_redis_list', None) status = request_dict.get('status', None) LOGGER.info('---更新序列号状态参数---serial_redis_list:{},status:{}'.format(serial_redis_list, status)) if not all([serial_redis_list, status]): return response.json(444) now_time = int(time.time()) try: serial_redis_list = eval(serial_redis_list) CompanySerialModel.objects.filter(serial_number__in=serial_redis_list).update(status=int(status), update_time=now_time) return response.json(0) except Exception as e: LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e))) return response.json(500) @staticmethod def reset_region_id(request_dict, response): """ 重置地区id @param request_dict: 请求参数 @request_dict serial_redis_list: 序列号redis列表 @param response: 响应对象 """ serial_redis_list = request_dict.get('serial_redis_list', None) LOGGER.info('---重置地区id参数---serial_redis_list:{}'.format(serial_redis_list)) if not serial_redis_list: return response.json(444) try: serial_redis_list = eval(serial_redis_list) DeviceDomainRegionModel.objects.filter(serial_number__in=serial_redis_list).update(region_id=0) return response.json(0) except Exception as e: LOGGER.info('---重置地区id异常---:{}'.format(repr(e))) return response.json(500) @staticmethod def update_experience_meal(request_dict, response): """ 定时修改体验套餐有效期为1个月 @param request_dict: 请求参数 @param response: 响应对象 """ try: meal_qs = Store_Meal.objects.filter(is_show=0, pay_type=10, expire=3, day=7).values('id', 'bucket') meal_id = meal_qs[0]['id'] lang_qs = Lang.objects.filter(store_meal__id=meal_id).values('lang') for item in lang_qs: lang = item['lang'] if lang == 'cn': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update(content='一个月免费套餐') elif lang == 'en': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update(content='1-Month plan (free trial)') elif lang == 'es': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update( content='Plan de 1 mes (prueba gratuita)') elif lang == 'fr': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update( content='Forfait de 1 mois (essai gratuit)') elif lang == 'de': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update( content='1 Monat Paket (kostenlose Testversion) ') elif lang == 'cn_tw': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update(content='一個月套餐(免費試用)') elif lang == 'pt': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update( content='Plano de 1 mês (teste gratuito)') elif lang == 'ru': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update( content='Тариф 1 месяц (бесплатный пробный период)') elif lang == 'ja': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update(content='1ヶ月プラン(無料試用)') elif lang == 'it': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update( content='Pacchetto di 1 mese (prova gratuita)') elif lang == 'pl': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update( content='jednomiesięczny pakiet (bezpłatny próbny)') elif lang == 'nl': Lang.objects.filter(lang=lang, store_meal__id=meal_id).update( content='Pakket van 1 maand (gratis proefperiode)') VodBucketModel.objects.filter(id=meal_qs[0]['bucket']).update(content='国内存储桶免费体验30天,录像保存7天') meal_qs.update(expire=1) return response.json(0) except Exception as e: LOGGER.info('---修改体验套餐有效期---:{}'.format(repr(e))) return response.json(500) class CronCollectDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'collectPlayBack': # 定时保存云存视频回放 return self.collect_play_back(response) elif operation == 'collectDeviceUser': # 定时保存用户数据 return self.collect_device_user(response) elif operation == 'collectOrder': # 定时保存订单数据 return self.collect_order(response) elif operation == 'collectIcloudOrder': # 定时保存云盘订单数据 return self.collect_icloud_order(response) elif operation == 'collectDeviceInfo': # 定时保存设备数据 return self.collect_device_info(response) elif operation == 'collectFlowInfo': # 定时保存设备数据 return self.collect_flow_info(response) else: return response.json(404) @staticmethod def collect_play_back(response): try: now_time = int(time.time()) today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, today.day) end_time = start_time + datetime.timedelta(days=1) start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')) this_month_str = datetime.datetime(today.year, today.month, 1) this_month_stamp = CommonService.str_to_timestamp(this_month_str.strftime('%Y-%m-%d %H:%M:%S')) video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time, startTime__lt=end_time, playMode='cloud').values('uid').annotate( play_duration=Sum('duration'), play_frequency=Count('uid')) with transaction.atomic(): for item in video_play_back_time_qs: vod_hls_summary_qs = VodHlsSummary.objects.filter(uid=item['uid'], time=this_month_stamp) if vod_hls_summary_qs.exists(): vod_hls_summary = vod_hls_summary_qs.first() vod_hls_summary.play_duration += item['play_duration'] vod_hls_summary.play_frequency += 1 vod_hls_summary.updated_time = now_time vod_hls_summary.save() else: VodHlsSummary.objects.create(uid=item['uid'], time=this_month_stamp, created_time=now_time, play_duration=item['play_duration'], play_frequency=1, updated_time=now_time) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def collect_device_user(response): try: created_time = int(time.time()) today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, today.day) end_time = start_time + datetime.timedelta(days=1) increase_user_qs = Device_User.objects.filter(data_joined__year=today.year, data_joined__month=today.month, data_joined__day=today.day).values('region_country') start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')) active_user_qs = UserExModel.objects.filter(updTime__gte=start_time, updTime__lt=end_time).values( 'userID__region_country') country_qs = CountryModel.objects.all().values('id', 'region__name', 'country_name') country_dict = {} continent_dict = {} for item in country_qs: country_dict[item['id']] = item['country_name'] continent_dict[item['country_name']] = item['region__name'] with transaction.atomic(): if increase_user_qs.exists(): increase_user_count = increase_user_qs.count() increase_user_country_list = increase_user_qs.values('region_country').annotate( count=Count('region_country')).order_by('count') increase_user_country_dict = {} increase_user_continent_dict = {} for item in increase_user_country_list: country_name = country_dict.get(item['region_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') increase_user_country_dict[country_name] = item['count'] if continent_name not in increase_user_continent_dict: increase_user_continent_dict[continent_name] = 0 increase_user_continent_dict[continent_name] += item['count'] DeviceUserSummary.objects.create(time=start_time, count=increase_user_count, country=increase_user_country_dict, created_time=created_time, continent=increase_user_continent_dict) if active_user_qs.exists(): active_user_count = active_user_qs.count() active_user_country_list = active_user_qs.values('userID__region_country').annotate( count=Count('userID__region_country')).order_by('count') active_user_country_dict = {} active_user_continent_dict = {} for item in active_user_country_list: country_name = country_dict.get(item['userID__region_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') active_user_country_dict[country_name] = item['count'] if continent_name not in active_user_continent_dict: active_user_continent_dict[continent_name] = 0 active_user_continent_dict[continent_name] += item['count'] DeviceUserSummary.objects.create(time=start_time, query_type=1, count=active_user_count, country=active_user_country_dict, created_time=created_time, continent=active_user_continent_dict) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def collect_order(response): try: created_time = int(time.time()) today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, today.day) end_time = start_time + datetime.timedelta(days=1) start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')) order_qs = Order_Model.objects.filter(addTime__gte=start_time, addTime__lt=end_time, status=1).values('UID', 'order_type', 'store_meal_name', 'price', 'addTime', 'currency').order_by( 'addTime') uid_list = [] all_order_qs = Order_Model.objects.filter(addTime__lt=start_time, status=1).values('UID') for item in all_order_qs: if item['UID'] not in uid_list: uid_list.append(item['UID']) # 国家表数据 country_qs = CountryModel.objects.values('id', 'country_name') country_dict = {} for item in country_qs: country_dict[item['id']] = item['country_name'] # 设备类型数据 device_type_qs = DeviceTypeModel.objects.values('name', 'type') device_type_dict = {} for item in device_type_qs: device_type_dict[item['type']] = item['name'] with transaction.atomic(): for item in order_qs: is_pay = 0 price = float(item['price']) currency = item['currency'] uid_set_qs = UidSetModel.objects.filter(uid=item['UID']).values('tb_country') country_id = uid_set_qs[0]['tb_country'] if uid_set_qs.exists() else 0 country_name = country_dict.get(country_id, '未知国家') order_type = item['order_type'] if order_type == '4' or order_type == 4: continue device_info_qs = Device_Info.objects.filter(UID=item['UID']).values('Type') device_type_id = device_info_qs[0]['Type'] if device_info_qs.exists() else 0 device_type_name = device_type_dict.get(device_type_id, '未知设备') store_meal_name = item['store_meal_name'] add_time_stamp = item['addTime'] add_time_str = datetime.datetime.fromtimestamp(int(add_time_stamp)) add_time_str = datetime.datetime(add_time_str.year, add_time_str.month, add_time_str.day) add_time_stamp = CommonService.str_to_timestamp(add_time_str.strftime('%Y-%m-%d %H:%M:%S')) if price == 0: is_pay = 1 order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=1, service_type=order_type) else: order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=0, service_type=order_type) if item['UID'] not in uid_list: pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=2, service_type=order_type) query_type = 2 else: pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=3, service_type=order_type) query_type = 3 if pay_order_summary_qs.exists(): pay_order_summary = pay_order_summary_qs.first() pay_order_summary.count += 1 temp_total = eval(pay_order_summary.total) if currency not in temp_total: temp_total[currency] = price else: temp_total[currency] = round(temp_total[currency] + price, 2) pay_order_summary.total = temp_total country_temp_dict = eval(pay_order_summary.country) if country_name in country_temp_dict: country_temp_dict[country_name]['数量'] += 1 if currency not in country_temp_dict[country_name]: country_temp_dict[country_name][currency] = price else: country_temp_dict[country_name][currency] = round( country_temp_dict[country_name][currency] + price, 2) else: country_temp_dict[country_name] = {'数量': 1, currency: price} pay_order_summary.country = country_temp_dict device_type_temp_dict = eval(pay_order_summary.device_type) if device_type_name in device_type_temp_dict: device_type_temp_dict[device_type_name]['数量'] += 1 if currency not in device_type_temp_dict[device_type_name]: device_type_temp_dict[device_type_name][currency] = price else: device_type_temp_dict[device_type_name][currency] = round( device_type_temp_dict[device_type_name][currency] + price, 2) else: device_type_temp_dict[device_type_name] = {'数量': 1, currency: price} pay_order_summary.device_type = device_type_temp_dict store_meal_temp_dict = eval(pay_order_summary.store_meal) if store_meal_name in store_meal_temp_dict: store_meal_temp_dict[store_meal_name]['数量'] += 1 if currency not in store_meal_temp_dict[store_meal_name]: store_meal_temp_dict[store_meal_name][currency] = price else: store_meal_temp_dict[store_meal_name][currency] = round( store_meal_temp_dict[store_meal_name][currency] + price, 2) else: store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price} pay_order_summary.store_meal = store_meal_temp_dict pay_order_summary.save() else: final_total = {currency: price} country_temp_dict = { country_name: { '数量': 1, currency: price } } device_type_temp_dict = { device_type_name: { '数量': 1, currency: price } } store_meal_temp_dict = { store_meal_name: { '数量': 1, currency: price } } OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=query_type, service_type=order_type, total=final_total, country=country_temp_dict, created_time=created_time, device_type=device_type_temp_dict, store_meal=store_meal_temp_dict) if order_summary_qs.exists(): order_summary = order_summary_qs.first() order_summary.count += 1 temp_total = eval(order_summary.total) if currency not in temp_total: temp_total[currency] = price else: temp_total[currency] = round(temp_total[currency] + price, 2) order_summary.total = temp_total country_temp_dict = eval(order_summary.country) if country_name in country_temp_dict: if is_pay == 0: country_temp_dict[country_name]['数量'] += 1 if currency not in country_temp_dict[country_name]: country_temp_dict[country_name][currency] = price else: country_temp_dict[country_name][currency] = round( country_temp_dict[country_name][currency] + price, 2) else: country_temp_dict[country_name] += 1 else: if is_pay == 0: country_temp_dict[country_name] = {'数量': 1, currency: price} else: country_temp_dict[country_name] = 1 order_summary.country = country_temp_dict device_type_temp_dict = eval(order_summary.device_type) if device_type_name in device_type_temp_dict: if is_pay == 0: device_type_temp_dict[device_type_name]['数量'] += 1 if currency not in device_type_temp_dict[device_type_name]: device_type_temp_dict[device_type_name][currency] = price else: device_type_temp_dict[device_type_name][currency] = round( device_type_temp_dict[device_type_name][currency] + price, 2) else: device_type_temp_dict[device_type_name] += 1 else: if is_pay == 0: device_type_temp_dict[device_type_name] = {'数量': 1, currency: price} else: device_type_temp_dict[device_type_name] = 1 order_summary.device_type = device_type_temp_dict store_meal_temp_dict = eval(order_summary.store_meal) if store_meal_name in store_meal_temp_dict: if is_pay == 0: store_meal_temp_dict[store_meal_name]['数量'] += 1 if currency not in store_meal_temp_dict[store_meal_name]: store_meal_temp_dict[store_meal_name][currency] = price else: store_meal_temp_dict[store_meal_name][currency] = round( store_meal_temp_dict[store_meal_name][currency] + price, 2) else: store_meal_temp_dict[store_meal_name] += 1 else: if is_pay == 0: store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price} else: store_meal_temp_dict[store_meal_name] = 1 order_summary.store_meal = store_meal_temp_dict order_summary.save() else: final_total = {currency: price} if is_pay == 0: country_temp_dict = { country_name: { '数量': 1, currency: price } } device_type_temp_dict = { device_type_name: { '数量': 1, currency: price } } store_meal_temp_dict = { store_meal_name: { '数量': 1, currency: price } } else: device_type_temp_dict = { device_type_name: 1 } store_meal_temp_dict = { store_meal_name: 1 } country_temp_dict = { country_name: 1 } OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=is_pay, service_type=order_type, total=final_total, country=country_temp_dict, created_time=created_time, device_type=device_type_temp_dict, store_meal=store_meal_temp_dict) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def collect_icloud_order(response): try: order_type = 4 created_time = int(time.time()) today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, today.day) end_time = start_time + datetime.timedelta(days=1) start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')) order_qs = Order_Model.objects.filter(addTime__gte=start_time, addTime__lt=end_time, order_type=order_type, status=1).values('userID', 'store_meal_name', 'price', 'addTime', 'currency').order_by( 'addTime') user_list = [] all_order_qs = Order_Model.objects.filter(addTime__lt=start_time, status=1, order_type=order_type).values( 'userID') for item in all_order_qs: if item['userID'] not in user_list: user_list.append(item['userID']) # 国家表数据 country_qs = CountryModel.objects.values('id', 'country_name') country_dict = {} for item in country_qs: country_dict[item['id']] = item['country_name'] with transaction.atomic(): for item in order_qs: price = float(item['price']) currency = item['currency'] user_qs = Device_User.objects.filter(userID=item['userID']).values('region_country') country_id = user_qs[0]['region_country'] if user_qs.exists() else 0 country_name = country_dict.get(country_id, '未知国家') store_meal_name = item['store_meal_name'] add_time_stamp = item['addTime'] add_time_str = datetime.datetime.fromtimestamp(int(add_time_stamp)) add_time_str = datetime.datetime(add_time_str.year, add_time_str.month, add_time_str.day) add_time_stamp = CommonService.str_to_timestamp(add_time_str.strftime('%Y-%m-%d %H:%M:%S')) order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=0, service_type=order_type) if item['userID'] not in user_list: pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=2, service_type=order_type) query_type = 2 else: pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=3, service_type=order_type) query_type = 3 if pay_order_summary_qs.exists(): pay_order_summary = pay_order_summary_qs.first() pay_order_summary.count += 1 temp_total = eval(pay_order_summary.total) if currency not in temp_total: temp_total[currency] = price else: temp_total[currency] = round(temp_total[currency] + price, 2) pay_order_summary.total = temp_total country_temp_dict = eval(pay_order_summary.country) if country_name in country_temp_dict: country_temp_dict[country_name]['数量'] += 1 if currency not in country_temp_dict[country_name]: country_temp_dict[country_name][currency] = price else: country_temp_dict[country_name][currency] = round( country_temp_dict[country_name][currency] + price, 2) else: country_temp_dict[country_name] = {'数量': 1, currency: price} pay_order_summary.country = country_temp_dict store_meal_temp_dict = eval(pay_order_summary.store_meal) if store_meal_name in store_meal_temp_dict: store_meal_temp_dict[store_meal_name]['数量'] += 1 if currency not in store_meal_temp_dict[store_meal_name]: store_meal_temp_dict[store_meal_name][currency] = price else: store_meal_temp_dict[store_meal_name][currency] = round( store_meal_temp_dict[store_meal_name][currency] + price, 2) else: store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price} pay_order_summary.store_meal = store_meal_temp_dict pay_order_summary.save() else: final_total = {currency: price} country_temp_dict = { country_name: { '数量': 1, currency: price } } store_meal_temp_dict = { store_meal_name: { '数量': 1, currency: price } } OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=query_type, service_type=order_type, total=final_total, country=country_temp_dict, created_time=created_time, device_type={}, store_meal=store_meal_temp_dict) if order_summary_qs.exists(): order_summary = order_summary_qs.first() order_summary.count += 1 temp_total = eval(order_summary.total) if currency not in temp_total: temp_total[currency] = price else: temp_total[currency] = round(temp_total[currency] + price, 2) order_summary.total = temp_total country_temp_dict = eval(order_summary.country) if country_name in country_temp_dict: country_temp_dict[country_name]['数量'] += 1 if currency not in country_temp_dict[country_name]: country_temp_dict[country_name][currency] = price else: country_temp_dict[country_name][currency] = round( country_temp_dict[country_name][currency] + price, 2) else: country_temp_dict[country_name] = {'数量': 1, currency: price} order_summary.country = country_temp_dict store_meal_temp_dict = eval(order_summary.store_meal) if store_meal_name in store_meal_temp_dict: store_meal_temp_dict[store_meal_name]['数量'] += 1 if currency not in store_meal_temp_dict[store_meal_name]: store_meal_temp_dict[store_meal_name][currency] = price else: store_meal_temp_dict[store_meal_name][currency] = round( store_meal_temp_dict[store_meal_name][currency] + price, 2) else: store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price} order_summary.store_meal = store_meal_temp_dict order_summary.save() else: final_total = {currency: price} country_temp_dict = { country_name: { '数量': 1, currency: price } } store_meal_temp_dict = { store_meal_name: { '数量': 1, currency: price } } OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=0, service_type=order_type, total=final_total, country=country_temp_dict, created_time=created_time, device_type={}, store_meal=store_meal_temp_dict) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def collect_device_info(response): try: created_time = int(time.time()) today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, today.day) end_time = start_time + datetime.timedelta(days=1) start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')) increase_device_qs = UidSetModel.objects.filter(addTime__gte=start_time, addTime__lt=end_time).values( 'tb_country', 'uid', 'device_type', 'cloud_vod', 'is_ai', 'mobile_4g', 'addTime') video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time, startTime__lt=end_time).values('uid') active_device_qs = UidSetModel.objects.filter(uid__in=video_play_back_time_qs).values('tb_country', 'addTime', 'device_type', 'cloud_vod', 'is_ai', 'mobile_4g', 'uid') increase_device_count = increase_device_qs.count() active_device_count = active_device_qs.count() # 国家表数据 country_qs = CountryModel.objects.values('id', 'country_name', 'region__name') country_dict = {} continent_dict = {} for item in country_qs: country_dict[item['id']] = item['country_name'] continent_dict[item['country_name']] = item['region__name'] # 设备类型数据 device_type_qs = DeviceTypeModel.objects.values('name', 'type') device_type_dict = {} for item in device_type_qs: device_type_dict[item['type']] = item['name'] with transaction.atomic(): if increase_device_qs.exists(): # 国家大洲设备数据 increase_device_country_list = increase_device_qs.values('tb_country').annotate( count=Count('tb_country')).order_by('count') increase_device_country_dict = {} increase_device_continent_dict = {} for item in increase_device_country_list: country_name = country_dict.get(item['tb_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') increase_device_country_dict[country_name] = item['count'] if continent_name not in increase_device_continent_dict: increase_device_continent_dict[continent_name] = 0 increase_device_continent_dict[continent_name] += item['count'] # 设备类型数据 increase_device_type_list = increase_device_qs.values('device_type').annotate( count=Count('device_type')).order_by('count') increase_device_type_dict = {} for item in increase_device_type_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') increase_device_type_dict[type_name] = item['count'] # 云存设备类型数据 increase_device_vod_list = increase_device_qs.filter(~Q(cloud_vod=2)).values( 'device_type').annotate( count=Count('device_type')).order_by('count') increase_device_vod_dict = {} for item in increase_device_vod_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') increase_device_vod_dict[type_name] = item['count'] # AI设备类型数据 increase_device_ai_list = increase_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate( count=Count('device_type')).order_by('count') increase_device_ai_dict = {} for item in increase_device_ai_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') increase_device_ai_dict[type_name] = item['count'] # 联通设备类型数据 increase_device_unicom_list = increase_device_qs.filter(~Q(mobile_4g=2)).values( 'device_type').annotate( count=Count('device_type')).order_by('count') increase_device_unicom_dict = {} for item in increase_device_unicom_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') increase_device_unicom_dict[type_name] = item['count'] DeviceInfoSummary.objects.create(time=start_time, count=increase_device_count, query_type=0, created_time=created_time, country=increase_device_country_dict, continent=increase_device_continent_dict, vod_service=increase_device_vod_dict, ai_service=increase_device_ai_dict, unicom_service=increase_device_unicom_dict, device_type=increase_device_type_dict) if active_device_qs.exists(): # 国家大洲设备数据 active_device_country_list = active_device_qs.values('tb_country').annotate( count=Count('tb_country')).order_by('count') active_device_country_dict = {} active_device_continent_dict = {} for item in active_device_country_list: country_name = country_dict.get(item['tb_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') active_device_country_dict[country_name] = item['count'] if continent_name not in active_device_continent_dict: active_device_continent_dict[continent_name] = 0 active_device_continent_dict[continent_name] += item['count'] # 设备类型数据 active_device_type_list = active_device_qs.values('device_type').annotate( count=Count('device_type')).order_by('count') active_device_type_dict = {} for item in active_device_type_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') active_device_type_dict[type_name] = item['count'] # 云存设备类型数据 active_device_vod_list = active_device_qs.filter(~Q(cloud_vod=2)).values('device_type').annotate( count=Count('device_type')).order_by('count') active_device_vod_dict = {} for item in active_device_vod_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') active_device_vod_dict[type_name] = item['count'] # AI设备类型数据 active_device_ai_list = active_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate( count=Count('device_type')).order_by('count') active_device_ai_dict = {} for item in active_device_ai_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') active_device_ai_dict[type_name] = item['count'] # 联通设备类型数据 active_device_unicom_list = active_device_qs.filter(~Q(mobile_4g=2)).values('device_type').annotate( count=Count('device_type')).order_by('count') active_device_unicom_dict = {} for item in active_device_unicom_list: type_name = device_type_dict.get(item['device_type'], '未知设备类型') active_device_unicom_dict[type_name] = item['count'] DeviceInfoSummary.objects.create(time=start_time, count=active_device_count, query_type=1, created_time=created_time, country=active_device_country_dict, continent=active_device_continent_dict, vod_service=active_device_vod_dict, ai_service=active_device_ai_dict, unicom_service=active_device_unicom_dict, device_type=active_device_type_dict) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def collect_flow_info(response): try: unicom_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').distinct().order_by('iccid') asy = threading.Thread(target=CronCollectDataView.thread_collect_flow, args=(unicom_qs,)) asy.start() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def thread_collect_flow(qs): try: unicom_api = UnicomObjeect() redis_obj = RedisObject() for item in qs: res = unicom_api.query_device_usage_history(**item) if res.status_code == 200: res_json = res.json() if res_json['code'] == 0: redis_dict = {} for data in res_json['data']['deviceUsageHistory']: year = data.get('year', None) month = data.get('month', None) flow = data.get('flowTotalUsage', None) if not all([year, month, flow]): continue file = str(year) + '-' + str(month) redis_dict[file] = flow key = 'monthly_flow_' + item['iccid'] if redis_dict: redis_obj.set_hash_data(key, redis_dict) except Exception as e: LOGGER.info('统计联通流量失败,时间为:{}'.format(int(time.time())))