#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # # Copyright (C) 2022 # # @Time : 2022/4/1 11:27 # @Author : ming # @Email : zhangdongming@asj6.wecom.work # @File : CronTaskController.py # @Software: PyCharm import datetime import time from django.db import connection, connections, transaction from django.db.models import Q, Sum, Count from django.views import View from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \ VodHlsModel, ExperienceContextModel, AiService, VodHlsSummary, VideoPlaybackTimeModel, DeviceUserSummary, \ CountryModel from Object.ResponseObject import ResponseObject from Object.utils import LocalDateTimeUtil from Service.CommonService import CommonService class CronDelDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'delAccessLog': # 定时删除访问接口数据 return self.delAccessLog(response) elif operation == 'delPushInfo': # 定时删除推送数据 return self.delPushInfo(response) elif operation == 'delVodHls': # 定时删除云存播放列表 return self.delVodHls(response) elif operation == 'delCloudLog': # 定时删除云存接口数据 return self.delCloudLog(response) elif operation == 'delTesterDevice': # 定时删除测试账号下的设备数据 return self.delTesterDevice(response) else: return response.json(404) @staticmethod def delAccessLog(response): try: cursor = connection.cursor() # 删除7天前的数据 last_week = LocalDateTimeUtil.get_last_week() sql = 'DELETE FROM access_log WHERE time < %s limit %s' cursor.execute(sql, [last_week, 10000]) # 关闭游标 cursor.close() connection.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delPushInfo(response): now_time = int(time.time()) cursor = connections['mysql02'].cursor() try: # 当前时间转日期 local_date_now = str(datetime.datetime.fromtimestamp(int(now_time)).date()) # 根据日期获取周几 week_val = LocalDateTimeUtil.date_to_week(local_date_now) # 根据当前时间获取7天前时间戳 expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7) # 每次删除条数 size = 10000 # 删除7天前的数据 sql = "DELETE FROM equipment_info WHERE addTime<= %s LIMIT %s " for i in range(6): cursor.execute(sql, [expiration_time, size]) if week_val == 1: sql = "DELETE FROM equipment_info_sunday WHERE add_time<= %s LIMIT %s " if week_val == 2: sql = "DELETE FROM equipment_info_monday WHERE add_time<= %s LIMIT %s " if week_val == 3: sql = "DELETE FROM equipment_info_tuesday WHERE add_time<= %s LIMIT %s " if week_val == 4: sql = "DELETE FROM equipment_info_wednesday WHERE add_time<= %s LIMIT %s " if week_val == 5: sql = "DELETE FROM equipment_info_thursday WHERE add_time<= %s LIMIT %s " if week_val == 6: sql = "DELETE FROM equipment_info_friday WHERE add_time<= %s LIMIT %s " if week_val == 7: sql = "DELETE FROM equipment_info_saturday WHERE add_time<= %s LIMIT %s " for i in range(5): cursor.execute(sql, [expiration_time, size]) # 关闭游标 cursor.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delVodHls(response): nowTime = int(time.time()) try: with transaction.atomic(): month_ago_time = nowTime - 30 * 24 * 60 * 60 # 删除1个月前的数据 vod_hls_qs = VodHlsModel.objects.filter(endTime__lte=month_ago_time) for vod_hls in vod_hls_qs: end_time = vod_hls.endTime end_time_str = datetime.datetime.fromtimestamp(int(end_time)) this_month_start = datetime.datetime(end_time_str.year, end_time_str.month, 1) this_month_start_stamp = CommonService.str_to_timestamp( this_month_start.strftime('%Y-%m-%d %H:%M:%S')) vod_hls_summary_qs = VodHlsSummary.objects.filter(time=this_month_start_stamp, uid=vod_hls.uid) if vod_hls_summary_qs.exists(): vod_hls_summary = vod_hls_summary_qs.first() vod_hls_summary.upload_duration = vod_hls_summary.upload_duration + vod_hls.sec vod_hls_summary.upload_frequency = vod_hls_summary.upload_frequency + 1 vod_hls_summary.save() else: VodHlsSummary.objects.create(time=this_month_start_stamp, uid=vod_hls.uid, upload_duration=vod_hls.sec, upload_frequency=1) vod_hls.delete() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delCloudLog(response): nowTime = int(time.time()) cursor = connection.cursor() try: # 删除3个月前的数据 sql = "DELETE FROM `cloud_log` WHERE time<={} LIMIT 50000".format( nowTime - 3 * 30 * 24 * 60 * 60) cursor.execute(sql) # 关闭游标 cursor.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delTesterDevice(response): try: userID_list = [ 'tech01@ansjer.com', 'tech02@ansjer.com', 'tech03@ansjer.com', 'tech04@ansjer.com', 'tech05@ansjer.com', 'tech06@ansjer.com', 'tech07@ansjer.com', 'tech08@ansjer.com', 'tech09@ansjer.com', 'tech10@ansjer.com', 'fix01@ansjer.com', 'fix02@ansjer.com', 'fix03@ansjer.com', 'fix04@ansjer.com', 'fix05@ansjer.com'] device_user = Device_User.objects.filter(username__in=userID_list) device_info_qs = Device_Info.objects.filter( userID__in=device_user).values('UID') uid_list = [] for device_info in device_info_qs: uid_list.append(device_info['UID']) with transaction.atomic(): # 删除设备云存相关数据 UidSetModel.objects.filter(uid__in=uid_list).delete() UID_Bucket.objects.filter(uid__in=uid_list).delete() Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete() Order_Model.objects.filter(UID__in=uid_list).delete() StsCrdModel.objects.filter(uid__in=uid_list).delete() VodHlsModel.objects.filter(uid__in=uid_list).delete() ExperienceContextModel.objects.filter( uid__in=uid_list).delete() Device_Info.objects.filter(userID__in=device_user).delete() return response.json(0) except Exception as e: return response.json(500, repr(e)) class CronUpdateDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'updateUnusedUidBucket': # 定时更新过期云存关联的未使用套餐状态 return self.updateUnusedUidBucket(response) elif operation == 'updateUnusedAiService': # 定时更新过期ai关联的未使用套餐状态 return self.updateUnusedAiService(response) else: return response.json(404) @staticmethod def updateUnusedUidBucket(response): """ 监控云存套餐过期修改状态 @param response: @return: """ # 定时更新已过期套餐修改状态为2 now_time = int(time.time()) expired_uid_bucket = UID_Bucket.objects.filter(endTime__lte=now_time) expired_uid_bucket = expired_uid_bucket.filter(~Q(use_status=2)).values('id') if expired_uid_bucket.exists(): expired_uid_bucket.update(use_status=2) # 监控有未使用套餐则自动生效 expired_uid_buckets = \ UID_Bucket.objects.filter(endTime__lte=now_time, has_unused=1).values("id", "uid")[0:1000] for expired_uid_bucket in expired_uid_buckets: unuseds = Unused_Uid_Meal.objects.filter( uid=expired_uid_bucket['uid']).values( "id", "uid", "channel", "addTime", "expire", "num", "bucket_id").order_by('addTime')[0:1] if not unuseds.exists(): continue unused = unuseds[0] try: with transaction.atomic(): count_unused = Unused_Uid_Meal.objects.filter( uid=expired_uid_bucket['uid']).count() has_unused = 1 if count_unused > 1 else 0 endTime = CommonService.calcMonthLater( unused['expire'] * unused['num']) UID_Bucket.objects.filter( uid=expired_uid_bucket['uid']).update( channel=unused['channel'], endTime=endTime, bucket_id=unused['bucket_id'], updateTime=now_time, use_status=1, has_unused=has_unused) Unused_Uid_Meal.objects.filter(id=unused['id']).delete() StsCrdModel.objects.filter( uid=expired_uid_bucket['uid']).delete() # 删除sts记录 except Exception as e: print(repr(e)) continue return response.json(0) @staticmethod def updateUnusedAiService(response): now_time = int(time.time()) ai_service_qs = AiService.objects.filter( endTime__lte=now_time, use_status=1).values( 'id', 'uid')[ 0:200] for ai_service in ai_service_qs: try: with transaction.atomic(): AiService.objects.filter( id=ai_service['id']).update( use_status=2) # 更新过期ai订单状态 # 如果存在未使用套餐,更新为使用 unused_ai_service = AiService.objects.filter( uid=ai_service['uid'], use_status=0).order_by('addTime')[ :1].values( 'id', 'endTime') if unused_ai_service.exists(): # 未使用套餐的endTime在购买的时候保存为有效时间 effective_day = unused_ai_service[0]['endTime'] endTime = now_time + effective_day AiService.objects.filter( id=unused_ai_service[0]['id']).update( use_status=1, endTime=endTime, updTime=now_time) except Exception: continue return response.json(0) class CronCollectDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'collectPlayBack': # 定时保存云存视频回放 return self.collect_play_back(response) if operation == 'collectDeviceUser': # 定时保存云存视频回放 return self.collect_device_user(response) else: return response.json(404) @staticmethod def collect_play_back(response): try: end_time = int(time.time()) start_time = end_time - 24 * 60 * 60 # 每天执行一次 today = datetime.datetime.today() this_month_str = datetime.datetime(today.year, today.month, 1) this_month_stamp = CommonService.str_to_timestamp(this_month_str.strftime('%Y-%m-%d %H:%M:%S')) video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time, startTime__lt=end_time, playMode='cloud').values('uid').annotate( play_duration=Sum('duration'), play_frequency=Count('uid')) with transaction.atomic(): for item in video_play_back_time_qs: vod_hls_summary_qs = VodHlsSummary.objects.filter(uid=item['uid'], time=this_month_stamp) if vod_hls_summary_qs.exists(): vod_hls_summary = vod_hls_summary_qs.first() vod_hls_summary.play_duration += item['play_duration'] vod_hls_summary.play_frequency += 1 vod_hls_summary.save() else: VodHlsSummary.objects.create(uid=item['uid'], time=this_month_stamp, play_duration=item['play_duration'], play_frequency=1) return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def collect_device_user(response): try: today = datetime.datetime.today() start_time = datetime.datetime(today.year, today.month, today.day) end_time = start_time + datetime.timedelta(days=1) increase_user_qs = Device_User.objects.filter(data_joined__gte=start_time, data_joined__lt=end_time).values( 'data_joined', 'region_country') active_user_qs = Device_User.objects.filter(last_login__gte=start_time, last_login__lt=end_time).values( 'last_login', 'region_country') start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')) country_qs = CountryModel.objects.all().values('id', 'region__name', 'country_name') country_dict = {} continent_dict = {} for item in country_qs: country_dict[item['id']] = item['country_name'] continent_dict[item['country_name']] = item['region__name'] with transaction.atomic(): for item in increase_user_qs: device_user_summary_qs = DeviceUserSummary.objects.filter(time=start_time, query_type=0) country_name = country_dict.get(item['region_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') if device_user_summary_qs.exists(): device_user_summary = device_user_summary_qs.first() country_temp_dict = eval(device_user_summary.country) continent_temp_dict = eval(device_user_summary.continent) if country_name in country_temp_dict: country_temp_dict[country_name] += 1 else: country_temp_dict[country_name] = 1 if continent_name in continent_temp_dict: continent_temp_dict[continent_name] += 1 else: continent_temp_dict[continent_name] = 1 device_user_summary.country = country_temp_dict device_user_summary.continent = continent_temp_dict device_user_summary.count += 1 device_user_summary.save() else: country_temp_dict = {country_name: 1} continent_temp_dict = {continent_name: 1} DeviceUserSummary.objects.create(time=start_time, count=1, country=country_temp_dict, continent=continent_temp_dict) for item in active_user_qs: device_user_summary_qs = DeviceUserSummary.objects.filter(time=start_time, query_type=1) country_name = country_dict.get(item['region_country'], '未知国家') continent_name = continent_dict.get(country_name, '未知大洲') if device_user_summary_qs.exists(): device_user_summary = device_user_summary_qs.first() country_temp_dict = eval(device_user_summary.country) continent_temp_dict = eval(device_user_summary.continent) if country_name in country_temp_dict: country_temp_dict[country_name] += 1 else: country_temp_dict[country_name] = 1 if continent_name in continent_temp_dict: continent_temp_dict[continent_name] += 1 else: continent_temp_dict[continent_name] = 1 device_user_summary.country = country_temp_dict device_user_summary.continent = continent_temp_dict device_user_summary.count += 1 device_user_summary.save() else: country_temp_dict = {country_name: 1} continent_temp_dict = {continent_name: 1} DeviceUserSummary.objects.create(time=start_time, query_type=1, count=1, country=country_temp_dict, continent=continent_temp_dict) return response.json(0) except Exception as e: return response.json(500, repr(e))