Browse Source

优化定时删除推送消息接口

locky 1 year ago
parent
commit
65398e0acb
1 changed files with 101 additions and 54 deletions
  1. 101 54
      Controller/Cron/CronTaskController.py

+ 101 - 54
Controller/Cron/CronTaskController.py

@@ -10,14 +10,14 @@
 import datetime
 import threading
 import time
-import logging
+
 import requests
 from django.db import connection, connections, transaction
 from django.db.models import Q, Sum, Count
 from django.views import View
 
 from Ansjer.config import USED_SERIAL_REDIS_LIST, UNUSED_SERIAL_REDIS_LIST, CONFIG_INFO, CONFIG_US, \
-    RESET_REGION_ID_SERIAL_REDIS_LIST
+    RESET_REGION_ID_SERIAL_REDIS_LIST, LOGGER
 from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \
     VodHlsModel, ExperienceContextModel, AiService, VodHlsSummary, VideoPlaybackTimeModel, DeviceUserSummary, \
     CountryModel, DeviceTypeModel, OrdersSummary, DeviceInfoSummary, CompanySerialModel, \
@@ -30,7 +30,6 @@ from Service.CommonService import CommonService
 from Service.VodHlsService import SplitVodHlsObject
 from Object.UnicomObject import UnicomObjeect
 
-LOGGER = logging.getLogger('info')
 
 class CronDelDataView(View):
     def get(self, request, *args, **kwargs):
@@ -61,6 +60,8 @@ class CronDelDataView(View):
             return self.UpdateConfiguration(response)
         elif operation == 'cloud-log':
             return self.uid_cloud_storage_upload_count(response)
+        elif operation == 'delDeviceLog':  # 定时删除设备日志
+            return self.del_device_log(response)
         else:
             return response.json(404)
 
@@ -111,7 +112,7 @@ class CronDelDataView(View):
             UidSetModel.objects.filter(ucode__in=ucode_list, is_ai=2).update(is_ai=1)
             return response.json(0)
         except Exception as e:
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
     def delAppLog(response):
@@ -129,7 +130,7 @@ class CronDelDataView(View):
             cursor.close()
             return response.json(0)
         except Exception as e:
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
     def uid_cloud_storage_upload_count(response):
@@ -170,12 +171,11 @@ class CronDelDataView(View):
             connection.close()
             return response.json(0)
         except Exception as e:
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
-    @staticmethod
-    def delPushInfo(response):
+    @classmethod
+    def delPushInfo(cls, response):
         now_time = int(time.time())
-        cursor = connections['mysql02'].cursor()
         try:
             # 当前时间转日期
             local_date_now = str(datetime.datetime.fromtimestamp(int(now_time)).date())
@@ -183,33 +183,66 @@ class CronDelDataView(View):
             week_val = LocalDateTimeUtil.date_to_week(local_date_now)
             # 根据当前时间获取7天前时间戳
             expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7)
-            # 每次删除条数
-            size = 10000
-            # 删除7天前的数据
-            sql = "DELETE FROM equipment_info WHERE addTime<= %s LIMIT %s "
-            for i in range(6):
-                cursor.execute(sql, [expiration_time, size])
+
+            # 异步删除推送消息
+            kwargs = {
+                'week_val': week_val,
+                'expiration_time': expiration_time
+            }
+            del_push_info_thread = threading.Thread(
+                target=cls.del_push_info_data,
+                kwargs=kwargs)
+            del_push_info_thread.start()
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def del_push_info_data(**kwargs):
+        cursor = connections['mysql02'].cursor()
+
+        # 获取删除星期列表
+        week_val = kwargs['week_val']
+        del_week_val_list = [i for i in range(1, 8)]
+        # 移除当天和前后两天
+        del_week_val_list.remove(week_val)
+
+        if week_val == 1:
+            pre_week_val = 7
+        else:
+            pre_week_val = week_val - 1
+        del_week_val_list.remove(pre_week_val)
+
+        if week_val == 7:
+            nex_week_val = 1
+        else:
+            nex_week_val = week_val + 1
+        del_week_val_list.remove(nex_week_val)
+
+        expiration_time = kwargs['expiration_time']
+        # 每次删除条数
+        size = 5000
+        # 删除7天前的数据
+        sql = "DELETE FROM equipment_info WHERE addTime<= %s LIMIT %s "
+        cursor.execute(sql, [expiration_time, size])
+        for week_val in del_week_val_list:
             if week_val == 1:
-                sql = "DELETE FROM equipment_info_sunday WHERE add_time<= %s LIMIT %s "
-            if week_val == 2:
                 sql = "DELETE FROM equipment_info_monday WHERE add_time<= %s LIMIT %s "
-            if week_val == 3:
+            if week_val == 2:
                 sql = "DELETE FROM equipment_info_tuesday WHERE add_time<= %s LIMIT %s "
-            if week_val == 4:
+            if week_val == 3:
                 sql = "DELETE FROM equipment_info_wednesday WHERE add_time<= %s LIMIT %s "
-            if week_val == 5:
+            if week_val == 4:
                 sql = "DELETE FROM equipment_info_thursday WHERE add_time<= %s LIMIT %s "
-            if week_val == 6:
+            if week_val == 5:
                 sql = "DELETE FROM equipment_info_friday WHERE add_time<= %s LIMIT %s "
-            if week_val == 7:
+            if week_val == 6:
                 sql = "DELETE FROM equipment_info_saturday WHERE add_time<= %s LIMIT %s "
-            for i in range(5):
-                cursor.execute(sql, [expiration_time, size])
-            # 关闭游标
-            cursor.close()
-            return response.json(0)
-        except Exception as e:
-            return response.json(500, repr(e))
+            if week_val == 7:
+                sql = "DELETE FROM equipment_info_sunday WHERE add_time<= %s LIMIT %s "
+            cursor.execute(sql, [expiration_time, size])
+        # 关闭游标
+        cursor.close()
 
     @staticmethod
     def delVodHls(response):
@@ -226,7 +259,7 @@ class CronDelDataView(View):
             split_vod_hls_obj.del_vod_hls_data(end_time__lt=month_ago_time)
             return response.json(0)
         except Exception as e:
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
     def del_vod_hls_tag():
@@ -250,7 +283,7 @@ class CronDelDataView(View):
             cursor.close()
             return response.json(0)
         except Exception as e:
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
     def delTesterDevice(response):
@@ -292,7 +325,26 @@ class CronDelDataView(View):
                 Device_Info.objects.filter(userID__in=device_user).delete()
             return response.json(0)
         except Exception as e:
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def del_device_log(response):
+        """
+        定时删除设备日志
+        @param response: 响应对象
+        @return:
+        """
+        nowTime = int(time.time())
+        try:
+            cursor = connection.cursor()
+            month_ago_time = nowTime - 30 * 24 * 60 * 60  # 保留近30天的数据
+
+            sql = 'DELETE FROM `device_log` WHERE unix_timestamp(add_time)<{}'.format(month_ago_time)
+            cursor.execute(sql)
+            cursor.close()
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
 
 class CronUpdateDataView(View):
@@ -411,26 +463,25 @@ class CronUpdateDataView(View):
     @classmethod
     def reqUpdateSerialStatus(cls, response):
         redis_obj = RedisObject()
-        logger = logging.getLogger('info')
 
         # 更新已使用序列号其他服务器的状态
         used_serial_redis_list = redis_obj.lrange(USED_SERIAL_REDIS_LIST, 0, -1)  # 读取redis已使用序列号
         if used_serial_redis_list:
-            logger.info('---请求更新已使用序列号列表---used_serial_redis_list:{}'.format(used_serial_redis_list))
+            LOGGER.info('---请求更新已使用序列号列表---used_serial_redis_list:{}'.format(used_serial_redis_list))
             used_serial_redis_list = [str(i, 'utf-8') for i in used_serial_redis_list]
             cls.do_request_function(used_serial_redis_list, 3)
 
         # 更新未使用序列号其他服务器的状态
         unused_serial_redis_list = redis_obj.lrange(UNUSED_SERIAL_REDIS_LIST, 0, -1)  # 读取redis未使用序列号
         if unused_serial_redis_list:
-            logger.info('---请求更新未使用序列号列表---unused_serial_redis_list:{}'.format(unused_serial_redis_list))
+            LOGGER.info('---请求更新未使用序列号列表---unused_serial_redis_list:{}'.format(unused_serial_redis_list))
             unused_serial_redis_list = [str(i, 'utf-8') for i in unused_serial_redis_list]
             cls.do_request_function(unused_serial_redis_list, 1)
 
         # 重置地区id
         reset_region_id_serial_redis_list = redis_obj.lrange(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, -1)  # 读取redis未使用序列号
         if reset_region_id_serial_redis_list:
-            logger.info('---请求重置地区id的序列号列表---:{}'.format(reset_region_id_serial_redis_list))
+            LOGGER.info('---请求重置地区id的序列号列表---:{}'.format(reset_region_id_serial_redis_list))
             reset_region_id_serial_redis_list = [str(i, 'utf-8') for i in reset_region_id_serial_redis_list]
             cls.do_request_reset_region_id(reset_region_id_serial_redis_list)
         return response.json(0)
@@ -449,31 +500,30 @@ class CronUpdateDataView(View):
         # 确认域名列表
         orders_domain_name_list = CommonService.get_orders_domain_name_list()
         redis_obj = RedisObject()
-        logger = logging.getLogger('info')
-        logger.info('---请求更新序列号线程---data:{},orders_domain_name_list:{}'.format(data, orders_domain_name_list))
+        LOGGER.info('---请求更新序列号线程---data:{},orders_domain_name_list:{}'.format(data, orders_domain_name_list))
         try:
             requests_failed_flag = False  # 请求失败标志位
             for domain_name in orders_domain_name_list:
                 url = '{}cron/update/updateSerialStatus'.format(domain_name)
                 response = requests.post(url=url, data=data, timeout=5)
-                logger.info('---请求更新序列号响应时间---:{}'.format(response.elapsed.total_seconds()))
+                LOGGER.info('---请求更新序列号响应时间---:{}'.format(response.elapsed.total_seconds()))
                 result = response.json()
                 if result['result_code'] != 0:  # 请求失败标志位置位
                     requests_failed_flag = True
                     break
 
                 # 状态为未使用,重置美洲服的地区id
-                if status == 1:     # 美洲服直接更新
+                if status == 1:  # 美洲服直接更新
                     if CONFIG_INFO == CONFIG_US:
-                        DeviceDomainRegionModel.objects.filter(~Q(region_id=0), serial_number__in=serial_redis_list).\
+                        DeviceDomainRegionModel.objects.filter(~Q(region_id=0), serial_number__in=serial_redis_list). \
                             update(region_id=0)
-                    else:   # 其他服请求到美洲服更新
+                    else:  # 其他服请求到美洲服更新
                         req_url = 'https://www.dvema.com/cron/update/reset-region-id'
                         req_data = {
                             'serial_redis_list': str(serial_redis_list)
                         }
                         response = requests.post(url=req_url, data=req_data, timeout=5)
-                        logger.info('---请求重置地区id响应时间---:{}'.format(response.elapsed.total_seconds()))
+                        LOGGER.info('---请求重置地区id响应时间---:{}'.format(response.elapsed.total_seconds()))
                         result = response.json()
                         if result['result_code'] != 0:  # 请求失败标志位置位
                             requests_failed_flag = True
@@ -487,7 +537,7 @@ class CronUpdateDataView(View):
                     for i in serial_redis_list:
                         redis_obj.lrem(USED_SERIAL_REDIS_LIST, 0, i)
         except Exception as e:
-            logger.info('---更新序列号状态异常---:{}'.format(repr(e)))
+            LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e)))
 
     @staticmethod
     def do_request_reset_region_id(reset_region_id_serial_redis_list):
@@ -496,7 +546,6 @@ class CronUpdateDataView(View):
         @param reset_region_id_serial_redis_list: 序列号redis列表
         """
         redis_obj = RedisObject()
-        logger = logging.getLogger('info')
         requests_failed_flag = False  # 请求失败标志位
         data = {
             'serial_redis_list': str(reset_region_id_serial_redis_list),
@@ -512,7 +561,7 @@ class CronUpdateDataView(View):
                 for serial in reset_region_id_serial_redis_list:
                     redis_obj.lrem(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, serial)
         except Exception as e:
-            logger.info('---请求重置地区id异常---:{}'.format(repr(e)))
+            LOGGER.info('---请求重置地区id异常---:{}'.format(repr(e)))
 
     @staticmethod
     def updateSerialStatus(request_dict, response):
@@ -525,8 +574,7 @@ class CronUpdateDataView(View):
         """
         serial_redis_list = request_dict.get('serial_redis_list', None)
         status = request_dict.get('status', None)
-        logger = logging.getLogger('info')
-        logger.info('---更新序列号状态参数---serial_redis_list:{},status:{}'.format(serial_redis_list, status))
+        LOGGER.info('---更新序列号状态参数---serial_redis_list:{},status:{}'.format(serial_redis_list, status))
         if not all([serial_redis_list, status]):
             return response.json(444)
         now_time = int(time.time())
@@ -536,7 +584,7 @@ class CronUpdateDataView(View):
                                                                                           update_time=now_time)
             return response.json(0)
         except Exception as e:
-            logger.info('---更新序列号状态异常---:{}'.format(repr(e)))
+            LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e)))
             return response.json(500)
 
     @staticmethod
@@ -548,8 +596,7 @@ class CronUpdateDataView(View):
         @param response: 响应对象
         """
         serial_redis_list = request_dict.get('serial_redis_list', None)
-        logger = logging.getLogger('info')
-        logger.info('---重置地区id参数---serial_redis_list:{}'.format(serial_redis_list))
+        LOGGER.info('---重置地区id参数---serial_redis_list:{}'.format(serial_redis_list))
         if not serial_redis_list:
             return response.json(444)
         try:
@@ -557,7 +604,7 @@ class CronUpdateDataView(View):
             DeviceDomainRegionModel.objects.filter(serial_number__in=serial_redis_list).update(region_id=0)
             return response.json(0)
         except Exception as e:
-            logger.info('---重置地区id异常---:{}'.format(repr(e)))
+            LOGGER.info('---重置地区id异常---:{}'.format(repr(e)))
             return response.json(500)
 
     @staticmethod
@@ -639,7 +686,7 @@ class CronCollectDataView(View):
 
             return response.json(0)
         except Exception as e:
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
     def collect_device_user(response):
@@ -696,7 +743,7 @@ class CronCollectDataView(View):
 
             return response.json(0)
         except Exception as e:
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
     def collect_order(response):