| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 | #!/usr/bin/python3.6# -*- coding: utf-8 -*-## Copyright (C) 2022 ## @Time    : 2022/4/1 11:27# @Author  : ming# @Email   : zhangdongming@asj6.wecom.work# @File    : CronTaskController.py# @Software: PyCharmimport datetimeimport timefrom django.db import connection, connections, transactionfrom django.views import Viewfrom Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \    VodHlsModel, ExperienceContextModel, AiServicefrom Object.ResponseObject import ResponseObjectfrom Object.utils import LocalDateTimeUtilfrom Service.CommonService import CommonServiceclass CronDelDataView(View):    def get(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        return self.validation(request.GET, request, operation)    def post(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        return self.validation(request.POST, request, operation)    def validation(self, request_dict, request, operation):        response = ResponseObject()        if operation == 'delAccessLog':  # 定时删除访问接口数据            return self.delAccessLog(response)        elif operation == 'delPushInfo':  # 定时删除推送数据            return self.delPushInfo(response)        elif operation == 'delVodHls':  # 定时删除云存播放列表            return self.delVodHls(response)        elif operation == 'delCloudLog':  # 定时删除云存接口数据            return self.delCloudLog(response)        elif operation == 'delTesterDevice':  # 定时删除测试账号下的设备数据            return self.delTesterDevice(response)        else:            return response.json(404)    @staticmethod    def delAccessLog(response):        try:            cursor = connection.cursor()            # 删除一个月前的数据            last_month = LocalDateTimeUtil.get_last_month()            sql = 'DELETE FROM access_log WHERE time < %s limit %s'            cursor.execute(sql, [last_month, 10000])            # 关闭游标            cursor.close()            connection.close()            return response.json(0)        except Exception as e:            return response.json(500, repr(e))    @staticmethod    def delPushInfo(response):        now_time = int(time.time())        cursor = connections['mysql02'].cursor()        try:            # 当前时间转日期            local_date_now = str(datetime.datetime.fromtimestamp(int(now_time)).date())            # 根据日期获取周几            week_val = LocalDateTimeUtil.date_to_week(local_date_now)            # 根据当前时间获取7天前时间戳            expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7)            # 每次删除条数            size = 10000            # 删除7天前的数据            sql = "DELETE FROM equipment_info WHERE eventTime<= %s LIMIT %s "            for i in range(6):                cursor.execute(sql, [expiration_time, size])            if week_val == 1:                sql = "DELETE FROM equipment_info_sunday WHERE event_time<= %s LIMIT %s "            if week_val == 2:                sql = "DELETE FROM equipment_info_monday WHERE event_time<= %s LIMIT %s "            if week_val == 3:                sql = "DELETE FROM equipment_info_tuesday WHERE event_time<= %s LIMIT %s "            if week_val == 4:                sql = "DELETE FROM equipment_info_wednesday WHERE event_time<= %s LIMIT %s "            if week_val == 5:                sql = "DELETE FROM equipment_info_thursday WHERE event_time<= %s LIMIT %s "            if week_val == 6:                sql = "DELETE FROM equipment_info_friday WHERE event_time<= %s LIMIT %s "            if week_val == 7:                sql = "DELETE FROM equipment_info_saturday WHERE event_time<= %s LIMIT %s "            for i in range(5):                cursor.execute(sql, [expiration_time, size])            # 关闭游标            cursor.close()            return response.json(0)        except Exception as e:            return response.json(500, repr(e))    @staticmethod    def delVodHls(response):        nowTime = int(time.time())        cursor = connection.cursor()        try:            # 删除3个月前的数据            sql = "DELETE FROM `vod_hls` WHERE endTime<={} LIMIT 50000".format(                nowTime - 3 * 30 * 24 * 60 * 60)            cursor.execute(sql)            # 关闭游标            cursor.close()            return response.json(0)        except Exception as e:            return response.json(500, repr(e))    @staticmethod    def delCloudLog(response):        nowTime = int(time.time())        cursor = connection.cursor()        try:            # 删除3个月前的数据            sql = "DELETE FROM `cloud_log` WHERE time<={} LIMIT 50000".format(                nowTime - 3 * 30 * 24 * 60 * 60)            cursor.execute(sql)            # 关闭游标            cursor.close()            return response.json(0)        except Exception as e:            return response.json(500, repr(e))    @staticmethod    def delTesterDevice(response):        try:            userID_list = [                'tech01@ansjer.com',                'tech02@ansjer.com',                'tech03@ansjer.com',                'tech04@ansjer.com',                'tech05@ansjer.com',                'tech06@ansjer.com',                'tech07@ansjer.com',                'tech08@ansjer.com',                'tech09@ansjer.com',                'tech10@ansjer.com',                'fix01@ansjer.com',                'fix02@ansjer.com',                'fix03@ansjer.com',                'fix04@ansjer.com',                'fix05@ansjer.com']            device_user = Device_User.objects.filter(username__in=userID_list)            device_info_qs = Device_Info.objects.filter(                userID__in=device_user).values('UID')            uid_list = []            for device_info in device_info_qs:                uid_list.append(device_info['UID'])            with transaction.atomic():                # 删除设备云存相关数据                UidSetModel.objects.filter(uid__in=uid_list).delete()                UID_Bucket.objects.filter(uid__in=uid_list).delete()                Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete()                Order_Model.objects.filter(UID__in=uid_list).delete()                StsCrdModel.objects.filter(uid__in=uid_list).delete()                VodHlsModel.objects.filter(uid__in=uid_list).delete()                ExperienceContextModel.objects.filter(                    uid__in=uid_list).delete()                Device_Info.objects.filter(userID__in=device_user).delete()            return response.json(0)        except Exception as e:            return response.json(500, repr(e))class CronUpdateDataView(View):    def get(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        return self.validation(request.GET, request, operation)    def post(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        return self.validation(request.POST, request, operation)    def validation(self, request_dict, request, operation):        response = ResponseObject()        if operation == 'expiredUidBucket':  # 定时更新过期云存套餐状态            return self.expiredUidBucket(response)        elif operation == 'updateUnusedUidBucket':  # 定时更新过期云存关联的未使用套餐状态            return self.updateUnusedUidBucket(response)        elif operation == 'updateUnusedAiService':  # 定时更新过期ai关联的未使用套餐状态            return self.updateUnusedAiService(response)        else:            return response.json(404)    @staticmethod    def expiredUidBucket(response):        now_time = int(time.time())        expired_uid_bucket = UID_Bucket.objects.filter(endTime__lte=now_time)        id_list = expired_uid_bucket.values_list('id', flat=True)        UID_Bucket.objects.filter(id__in=list(id_list)).update(use_status=2)        return response.json(0)    @staticmethod    def updateUnusedUidBucket(response):        now_time = int(time.time())        expired_uid_buckets = UID_Bucket.objects.filter(            endTime__lte=now_time,            has_unused=1).values(            "id",            "uid")[                              0:1000]        for expired_uid_bucket in expired_uid_buckets:            unuseds = Unused_Uid_Meal.objects.filter(                uid=expired_uid_bucket['uid']).values(                "id",                "uid",                "channel",                "addTime",                "expire",                "num",                "bucket_id").order_by('addTime')[                      0:1]            if not unuseds.exists():                continue            unused = unuseds[0]            try:                with transaction.atomic():                    count_unused = Unused_Uid_Meal.objects.filter(                        uid=expired_uid_bucket['uid']).count()                    has_unused = 1 if count_unused > 1 else 0                    endTime = CommonService.calcMonthLater(                        unused['expire'] * unused['num'])                    UID_Bucket.objects.filter(                        uid=expired_uid_bucket['uid']).update(                        channel=unused['channel'],                        endTime=endTime,                        bucket_id=unused['bucket_id'],                        updateTime=now_time,                        use_status=1,                        has_unused=has_unused)                    Unused_Uid_Meal.objects.filter(id=unused['id']).delete()                    StsCrdModel.objects.filter(                        uid=expired_uid_bucket['uid']).delete()  # 删除sts记录            except Exception:                continue        return response.json(0)    @staticmethod    def updateUnusedAiService(response):        now_time = int(time.time())        ai_service_qs = AiService.objects.filter(            endTime__lte=now_time,            use_status=1).values(            'id',            'uid')[                        0:200]        for ai_service in ai_service_qs:            try:                with transaction.atomic():                    AiService.objects.filter(                        id=ai_service['id']).update(                        use_status=2)  # 更新过期ai订单状态                    # 如果存在未使用套餐,更新为使用                    unused_ai_service = AiService.objects.filter(                        uid=ai_service['uid'],                        use_status=0).order_by('addTime')[                                        :1].values(                        'id',                        'endTime')                    if unused_ai_service.exists():                        # 未使用套餐的endTime在购买的时候保存为有效时间                        effective_day = unused_ai_service[0]['endTime']                        endTime = now_time + effective_day                        AiService.objects.filter(                            id=unused_ai_service[0]['id']).update(                            use_status=1, endTime=endTime, updTime=now_time)            except Exception:                continue        return response.json(0)
 |