#!/usr/bin/python3.6 # -*- coding: utf-8 -*- # # Copyright (C) 2022 # # @Time : 2022/4/1 11:27 # @Author : ming # @Email : zhangdongming@asj6.wecom.work # @File : CronTaskController.py # @Software: PyCharm import datetime import time from django.db import connection, connections, transaction from django.views import View from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \ VodHlsModel, ExperienceContextModel, AiService from Object.ResponseObject import ResponseObject from Object.utils import LocalDateTimeUtil from Service.CommonService import CommonService class CronDelDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'delAccessLog': # 定时删除访问接口数据 return self.delAccessLog(response) elif operation == 'delPushInfo': # 定时删除推送数据 return self.delPushInfo(response) elif operation == 'delVodHls': # 定时删除云存播放列表 return self.delVodHls(response) elif operation == 'delCloudLog': # 定时删除云存接口数据 return self.delCloudLog(response) elif operation == 'delTesterDevice': # 定时删除测试账号下的设备数据 return self.delTesterDevice(response) else: return response.json(404) @staticmethod def delAccessLog(response): try: cursor = connection.cursor() # 删除一个月前的数据 last_month = LocalDateTimeUtil.get_last_month() sql = 'DELETE FROM access_log WHERE time < %s limit %s' cursor.execute(sql, [last_month, 10000]) # 关闭游标 cursor.close() connection.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delPushInfo(response): now_time = int(time.time()) cursor = connections['mysql02'].cursor() try: # 当前时间转日期 local_date_now = str(datetime.datetime.fromtimestamp(int(now_time)).date()) # 根据日期获取周几 week_val = LocalDateTimeUtil.date_to_week(local_date_now) # 根据当前时间获取7天前时间戳 expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7) # 每次删除条数 size = 10000 # 删除7天前的数据 sql = "DELETE FROM equipment_info WHERE eventTime<= %s LIMIT %s " for i in range(6): cursor.execute(sql, [expiration_time, size]) if week_val == 1: sql = "DELETE FROM equipment_info_sunday WHERE event_time<= %s LIMIT %s " if week_val == 2: sql = "DELETE FROM equipment_info_monday WHERE event_time<= %s LIMIT %s " if week_val == 3: sql = "DELETE FROM equipment_info_tuesday WHERE event_time<= %s LIMIT %s " if week_val == 4: sql = "DELETE FROM equipment_info_wednesday WHERE event_time<= %s LIMIT %s " if week_val == 5: sql = "DELETE FROM equipment_info_thursday WHERE event_time<= %s LIMIT %s " if week_val == 6: sql = "DELETE FROM equipment_info_friday WHERE event_time<= %s LIMIT %s " if week_val == 7: sql = "DELETE FROM equipment_info_saturday WHERE event_time<= %s LIMIT %s " for i in range(5): cursor.execute(sql, [expiration_time, size]) # 关闭游标 cursor.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delVodHls(response): nowTime = int(time.time()) cursor = connection.cursor() try: # 删除3个月前的数据 sql = "DELETE FROM `vod_hls` WHERE endTime<={} LIMIT 50000".format( nowTime - 3 * 30 * 24 * 60 * 60) cursor.execute(sql) # 关闭游标 cursor.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delCloudLog(response): nowTime = int(time.time()) cursor = connection.cursor() try: # 删除3个月前的数据 sql = "DELETE FROM `cloud_log` WHERE time<={} LIMIT 50000".format( nowTime - 3 * 30 * 24 * 60 * 60) cursor.execute(sql) # 关闭游标 cursor.close() return response.json(0) except Exception as e: return response.json(500, repr(e)) @staticmethod def delTesterDevice(response): try: userID_list = [ 'tech01@ansjer.com', 'tech02@ansjer.com', 'tech03@ansjer.com', 'tech04@ansjer.com', 'tech05@ansjer.com', 'tech06@ansjer.com', 'tech07@ansjer.com', 'tech08@ansjer.com', 'tech09@ansjer.com', 'tech10@ansjer.com', 'fix01@ansjer.com', 'fix02@ansjer.com', 'fix03@ansjer.com', 'fix04@ansjer.com', 'fix05@ansjer.com'] device_user = Device_User.objects.filter(username__in=userID_list) device_info_qs = Device_Info.objects.filter( userID__in=device_user).values('UID') uid_list = [] for device_info in device_info_qs: uid_list.append(device_info['UID']) with transaction.atomic(): # 删除设备云存相关数据 UidSetModel.objects.filter(uid__in=uid_list).delete() UID_Bucket.objects.filter(uid__in=uid_list).delete() Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete() Order_Model.objects.filter(UID__in=uid_list).delete() StsCrdModel.objects.filter(uid__in=uid_list).delete() VodHlsModel.objects.filter(uid__in=uid_list).delete() ExperienceContextModel.objects.filter( uid__in=uid_list).delete() Device_Info.objects.filter(userID__in=device_user).delete() return response.json(0) except Exception as e: return response.json(500, repr(e)) class CronUpdateDataView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'expiredUidBucket': # 定时更新过期云存套餐状态 return self.expiredUidBucket(response) elif operation == 'updateUnusedUidBucket': # 定时更新过期云存关联的未使用套餐状态 return self.updateUnusedUidBucket(response) elif operation == 'updateUnusedAiService': # 定时更新过期ai关联的未使用套餐状态 return self.updateUnusedAiService(response) else: return response.json(404) @staticmethod def expiredUidBucket(response): now_time = int(time.time()) expired_uid_bucket = UID_Bucket.objects.filter(endTime__lte=now_time) id_list = expired_uid_bucket.values_list('id', flat=True) UID_Bucket.objects.filter(id__in=list(id_list)).update(use_status=2) return response.json(0) @staticmethod def updateUnusedUidBucket(response): now_time = int(time.time()) expired_uid_buckets = UID_Bucket.objects.filter( endTime__lte=now_time, has_unused=1).values( "id", "uid")[ 0:1000] for expired_uid_bucket in expired_uid_buckets: unuseds = Unused_Uid_Meal.objects.filter( uid=expired_uid_bucket['uid']).values( "id", "uid", "channel", "addTime", "expire", "num", "bucket_id").order_by('addTime')[ 0:1] if not unuseds.exists(): continue unused = unuseds[0] try: with transaction.atomic(): count_unused = Unused_Uid_Meal.objects.filter( uid=expired_uid_bucket['uid']).count() has_unused = 1 if count_unused > 1 else 0 endTime = CommonService.calcMonthLater( unused['expire'] * unused['num']) UID_Bucket.objects.filter( uid=expired_uid_bucket['uid']).update( channel=unused['channel'], endTime=endTime, bucket_id=unused['bucket_id'], updateTime=now_time, use_status=1, has_unused=has_unused) Unused_Uid_Meal.objects.filter(id=unused['id']).delete() StsCrdModel.objects.filter( uid=expired_uid_bucket['uid']).delete() # 删除sts记录 except Exception: continue return response.json(0) @staticmethod def updateUnusedAiService(response): now_time = int(time.time()) ai_service_qs = AiService.objects.filter( endTime__lte=now_time, use_status=1).values( 'id', 'uid')[ 0:200] for ai_service in ai_service_qs: try: with transaction.atomic(): AiService.objects.filter( id=ai_service['id']).update( use_status=2) # 更新过期ai订单状态 # 如果存在未使用套餐,更新为使用 unused_ai_service = AiService.objects.filter( uid=ai_service['uid'], use_status=0).order_by('addTime')[ :1].values( 'id', 'endTime') if unused_ai_service.exists(): # 未使用套餐的endTime在购买的时候保存为有效时间 effective_day = unused_ai_service[0]['endTime'] endTime = now_time + effective_day AiService.objects.filter( id=unused_ai_service[0]['id']).update( use_status=1, endTime=endTime, updTime=now_time) except Exception: continue return response.json(0)