Browse Source

每日统计储存量
每月统计运营利润及成本

peng 1 year ago
parent
commit
23f877b63c
2 changed files with 245 additions and 4 deletions
  1. 136 4
      Controller/Cron/CronTaskController.py
  2. 109 0
      Object/AWS/AmazonS3Util.py

+ 136 - 4
Controller/Cron/CronTaskController.py

@@ -22,17 +22,18 @@ from django.db.models import Q, Sum, Count
 from django.views import View
 
 from Ansjer.config import USED_SERIAL_REDIS_LIST, UNUSED_SERIAL_REDIS_LIST, CONFIG_INFO, CONFIG_US, \
-    RESET_REGION_ID_SERIAL_REDIS_LIST, LOGGER, PAYPAL_CRD, CONFIG_EUR, DETECT_PUSH_DOMAINS
+    RESET_REGION_ID_SERIAL_REDIS_LIST, LOGGER, PAYPAL_CRD, CONFIG_EUR, DETECT_PUSH_DOMAINS, ACCESS_KEY_ID, \
+    SECRET_ACCESS_KEY, REGION_NAME
 from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \
     VodHlsModel, ExperienceContextModel, AiService, VodHlsSummary, VideoPlaybackTimeModel, DeviceUserSummary, \
     CountryModel, DeviceTypeModel, OrdersSummary, DeviceInfoSummary, CompanySerialModel, \
     CloudLogModel, UidCloudStorageCount, UserExModel, DeviceDomainRegionModel, VodHlsTag, VodHlsTagType, \
     Store_Meal, Lang, VodBucketModel, UnicomComboOrderInfo, UnicomDeviceInfo, AbnormalOrder, DailyReconciliation, \
-    CustomizedPush, UIDCompanySerialModel, UIDModel, LogModel
+    CustomizedPush, UIDCompanySerialModel, UIDModel, LogModel, OperatingCosts, UidBucketStatistics
+from Object.AWS.AmazonS3Util import AmazonS3Util
 from Object.RedisObject import RedisObject
 from Object.ResponseObject import ResponseObject
 from Object.utils import LocalDateTimeUtil
-from Object.utils.PayPalUtil import PayPalService
 from Service.CommonService import CommonService
 from Service.EquipmentInfoService import EQUIPMENT_INFO_MODEL_LIST
 from Service.VodHlsService import SplitVodHlsObject
@@ -299,7 +300,7 @@ class CronDelDataView(View):
         # 每次删除条数
         size = 5000
 
-        for i in range(1, len(EQUIPMENT_INFO_MODEL_LIST)+1):
+        for i in range(1, len(EQUIPMENT_INFO_MODEL_LIST) + 1):
             sql = "DELETE FROM equipment_info_{} WHERE add_time< %s LIMIT %s ".format(i)
             cursor.execute(sql, [expiration_time, size])
 
@@ -757,6 +758,10 @@ class CronCollectDataView(View):
             return self.collect_device_info(response)
         elif operation == 'collectFlowInfo':  # 定时保存设备数据
             return self.collect_flow_info(response)
+        elif operation == 'collectOperatingCosts':  # 定时运营成本
+            return self.collect_operating_costs(response)
+        elif operation == 'collectObjSize':  # 定时设备s3存储量
+            return self.collect_obj_size(response)
         else:
             return response.json(404)
 
@@ -1081,6 +1086,133 @@ class CronCollectDataView(View):
         except Exception as e:
             return response.json(500, repr(e))
 
+    @staticmethod
+    def collect_operating_costs(response):
+        try:
+            today = datetime.datetime.today()
+            start_time = datetime.datetime(today.year, today.month, 1)
+            end_time = start_time + relativedelta(months=1)
+            start_time_stamp = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
+            end_time_stamp = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
+            thread = threading.Thread(target=CronCollectDataView.thread_collect_operating_costs,
+                                      args=(start_time_stamp, end_time_stamp, start_time, end_time))
+            thread.start()  # 启动线程
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def thread_collect_operating_costs(start_time_stamp, end_time_stamp, start_time, end_time):
+        try:
+            operating_costs_qs = OperatingCosts.objects.filter(time=start_time_stamp).values('order_id', 'end_time',
+                                                                                             'uid')
+            for item in operating_costs_qs:
+                order_qs = Order_Model.objects.filter(orderID=item['order_id'], UID=item['uid']).values('price',
+                                                                                                        'payTime',
+                                                                                                        'order_type',
+                                                                                                        'rank__expire',
+                                                                                                        'fee',
+                                                                                                        'payType')
+                if order_qs.exists():
+                    order = order_qs[0]
+                    if order['order_type'] not in [0, 1]:
+                        continue
+                    price = float(order['price'])
+                    if order['payType'] in [2, 3]:
+                        fee = 0.0054 * price
+                    else:
+                        fee = float(order['fee']) if order['fee'] else 0
+                    if item['end_time'] > end_time_stamp:  # 当月结算天数
+                        settlement_days = (end_time - start_time).days
+                        remaining_usage_time = (item['end_time'] - end_time_stamp) // 86400  # 剩余使用时间
+                    else:
+                        settlement_days = (item['end_time'] - start_time_stamp) // 86400
+                        remaining_usage_time = 0
+                    day_average_price = round(price / (order['rank__expire'] * 30), 2)  # 收入分摊/天
+                    month_average_price = round(day_average_price * settlement_days, 2)  # 收入分摊/月
+                    monthly_income = round((price - fee) / (order['rank__expire'] * 30) * settlement_days, 2)  # 当月结算收入
+                    if item['end_time'] < end_time_stamp:
+                        uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
+                                                                                   time__lt=item['end_time'],
+                                                                                   uid=item['uid'])
+                    else:
+                        uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
+                                                                                   time__lt=end_time_stamp,
+                                                                                   uid=item['uid'])
+                    result = uid_bucket_statistics.aggregate(size=Sum('storage_size'), api_count=Sum('api_count'))
+                    actual_storage = round(result['size'], 2) if result['size'] else 0
+                    actual_api = result['api_count'] if result['api_count'] else 0
+                    OperatingCosts.objects.filter(time=start_time_stamp, order_id=item['order_id'],
+                                                  uid=item['uid']).update(day_average_price=day_average_price,
+                                                                          month_average_price=month_average_price,
+                                                                          monthly_income=monthly_income,
+                                                                          actual_storage=actual_storage,
+                                                                          settlement_days=settlement_days,
+                                                                          actual_api=actual_api,
+                                                                          remaining_usage_time=remaining_usage_time)
+            print('结束')
+        except Exception as e:
+            LOGGER.info(
+                'thread_collect_operating_costs接口异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def collect_obj_size(response):
+        try:
+            today = datetime.datetime.today()
+            end_time = datetime.datetime(today.year, today.month, today.day)
+            first_date = datetime.datetime(today.year, today.month, 1)
+            start_time = end_time - datetime.timedelta(days=1)
+            start_time_stamp = int(start_time.timestamp())
+            end_time_stamp = int(end_time.timestamp())
+            first_date_stamp = int(first_date.timestamp())
+            thread = threading.Thread(target=CronCollectDataView.thread_collect_obj_size,
+                                      args=(start_time_stamp, end_time_stamp, first_date_stamp))
+            thread.start()  # 启动线程
+
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def thread_collect_obj_size(start_time, end_time, first_date):
+        try:
+            creat_time = int(time.time())
+            uid_list = UidBucketStatistics.objects.filter(time=start_time).values_list('uid', flat=True)
+            uid_vod = UID_Bucket.objects.filter(Q(endTime__gte=start_time), ~Q(uid__in=uid_list)).values('uid',
+                                                                                                         'bucket__bucket',
+                                                                                                         'orderId',
+                                                                                                         'channel',
+                                                                                                         'endTime')
+            s3_obj = AmazonS3Util(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME)
+            for item in uid_vod:
+                path = '{uid}/vod{channel}'.format(uid=item['uid'], channel=item['channel'])
+                s3_result = s3_obj.get_object_list(item['bucket__bucket'], path,
+                                                   path + '/{}'.format(start_time), end_time)
+                actual_storage = 0
+                actual_api = 0
+                for obj in s3_result:
+                    temp_time = int(obj['Key'].split('/')[2])
+                    if temp_time < end_time:
+                        actual_storage += obj['Size']
+                        actual_api += 1
+                actual_storage = round(actual_storage / 1024 / 1024, 2)
+                with transaction.atomic():
+                    if actual_api:
+                        UidBucketStatistics.objects.create(uid=item['uid'], storage_size=actual_storage,
+                                                           api_count=actual_api,
+                                                           created_time=creat_time,
+                                                           time=start_time)
+                    operating_costs_qs = OperatingCosts.objects.filter(order_id=item['orderId'], uid=item['uid'],
+                                                                       time=first_date)
+                    if not operating_costs_qs.exists():
+                        OperatingCosts.objects.create(order_id=item['orderId'], uid=item['uid'],
+                                                      created_time=creat_time, time=first_date,
+                                                      end_time=item['endTime'])
+                print(actual_storage, actual_api)
+            print('结束')
+        except Exception as e:
+            LOGGER.info('统计s3信息异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
     @staticmethod
     def collect_device_info(response):
         try:

+ 109 - 0
Object/AWS/AmazonS3Util.py

@@ -149,3 +149,112 @@ class AmazonS3Util:
             'Key': file_key
         }
         self.session_conn.meta.client.copy(source_dict, to_bucket, file_key)
+
+    def copy_single_obj(self, source_bucket, source_object, target_bucket, target_object, StorageClass=None):
+        """
+        单个对象复制
+        @param source_bucket:源存储桶
+        @param source_object:源对象
+        @param target_bucket:目标存储桶
+        @param target_object:目标对象
+        @param StorageClass:存储类
+        @return: None
+        """
+        s3 = self.session_conn
+        copy_source = {
+            'Bucket': source_bucket,
+            'Key': source_object
+        }
+        target_object = s3.Object(target_bucket, target_object)
+        if StorageClass:
+            target_object.copy_from(CopySource=copy_source, StorageClass=StorageClass)
+        else:
+            target_object.copy_from(CopySource=copy_source)
+
+    def generate_put_obj_url(self, bucket_name, obj_key, storage_class=None):
+        """
+        生成预签名对象URL
+        @param bucket_name: 存储桶名称
+        @param obj_key: 对象key
+        @param storage_class: 存储类 例
+        @return: 对象URL
+        """
+        params = {
+            'Bucket': bucket_name,
+            'Key': obj_key,
+        }
+        if storage_class:
+            params['StorageClass'] = storage_class
+        return self.session_conn.meta.client.generate_presigned_url('put_object',
+                                                                    Params=params,
+                                                                    ExpiresIn=7200)
+
+    def batch_copy_obj(self, source_bucket, target_bucket, prefix, target_prefix, storage_class=None):
+        """
+        批量拷贝对象
+        @param source_bucket: 源存储桶
+        @param target_bucket: 目标存储桶
+        @param prefix: 需要搜索的对象前缀 例:AUS000247LTCLY/vod1/1686043996
+        @param target_prefix: 目标对象前缀 例:app/algorithm-shop/1686043996
+        @param storage_class: 存储类
+        @return: None
+        """
+        s3 = self.session_conn
+        # 遍历源存储桶中指定前缀下的所有对象,依次进行复制操作
+        for obj in s3.Bucket(source_bucket).objects.filter(Prefix=prefix):
+            key = obj.key  # 对象键名
+            target_key = f'{target_prefix}/' + key.split('/')[-1]  # 新的对象键名,此处为 "new_path/" + 原有文件名
+            copy_source = {
+                'Bucket': source_bucket,
+                'Key': key
+            }
+            # 将对象复制到目标存储桶,并设置存储类型和新的对象键名
+            if storage_class:
+                s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source, StorageClass=storage_class)
+            else:
+                s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source)
+
+    def get_object_size(self, bucket_name, object_key):
+        """
+        获取存储桶中指定对象的大小
+
+        :param bucket_name: string,存储桶名称
+        :param object_key: string,对象键名
+        :return: int,指定对象的大小,单位为字节
+        """
+        s3 = self.session_conn
+        obj = s3.Object(bucket_name, object_key)
+        try:
+            return obj.content_length
+        except Exception as e:
+            return 0
+
+    def get_object_list(self, bucket_name, prefix, start_after='', end_time=None):
+        """
+        获取指定路径所有对象
+
+        :param bucket_name: string,存储桶名称
+        :param prefix: string,路径
+        :param start_after: string,开始键
+        :param end_time: string,结束时间
+        :return: int,指定对象的大小,单位为字节
+        """
+        try:
+            s3 = self.client_conn
+            continuation_token = ''
+            contents = []
+            while True:
+                if continuation_token:
+                    result = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after,
+                                                ContinuationToken=continuation_token)
+                else:
+                    result = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after)
+                contents += result['Contents']
+                continuation_token = result['NextContinuationToken']
+                if result['KeyCount'] < 1000:
+                    break
+                if end_time and end_time < int(result['Contents'][-1]['Key'].split('/')[2]):
+                    break
+            return contents
+        except Exception as e:
+            return []