Эх сурвалжийг харах

阿里云云存订单成本统计、入库产品订单统计

zhangdongming 4 өдөр өмнө
parent
commit
1bb319f22b

+ 150 - 5
AdminController/ProductsSchemeManageController.py

@@ -18,11 +18,12 @@ from PIL import Image, ImageDraw, ImageFont
 from django.core.paginator import Paginator, EmptyPage
 from django.db import transaction, IntegrityError
 from django.db.models import Q
-from django.http import QueryDict, HttpResponse
+from django.db.models.functions import Substr
+from django.http import QueryDict, HttpResponse, JsonResponse
 from django.views import View
 
 from Ansjer.config import LOGGER, BASE_DIR
-from Model.models import ProductsScheme, DeviceScheme, DeviceTypeModel, CompanySerialModel
+from Model.models import ProductsScheme, DeviceScheme, DeviceTypeModel, CompanySerialModel, Device_User
 from Object.ResponseObject import ResponseObject
 from Object.TokenObject import TokenObject
 
@@ -83,6 +84,8 @@ class ProductsSchemeManageView(View):
             'batchAddScheme': self.batch_add_scheme,  # 批量添加
             'getSkuList': self.get_sku_list,  # 获取SKU列表'
             'getSchemeBySku': self.get_scheme_by_sku,  # 根据SKU获取方案
+            'getOrderData': self.get_order_data, # 统计订单入库数据
+            'getOrderList': self.get_order_list, # 返回所有订单下拉框选择
         }
 
         handler = operation_handlers.get(operation)
@@ -138,8 +141,19 @@ class ProductsSchemeManageView(View):
         except EmptyPage:
             return response.json(444, "页码超出范围")
 
-        # 序列化数据
-        data = [self._scheme_to_dict(scheme) for scheme in page_obj]
+        creator_ids = [scheme.created_by for scheme in page_obj if scheme.created_by]
+
+        user_mapping = {
+            user.userID : user.username
+            for user in Device_User.objects.filter(userID__in=creator_ids)
+        }
+
+        # 序列化数据并补充username字段
+        data = []
+        for scheme in page_obj:
+            scheme_dict = self._scheme_to_dict(scheme)
+            scheme_dict['username'] = user_mapping.get(scheme.created_by, '')
+            data.append(scheme_dict)
 
         return response.json(0, {
             'list': data,
@@ -731,4 +745,135 @@ class ProductsSchemeManageView(View):
 
         except Exception as e:
             LOGGER.exception(f"获取方案详情异常: {repr(e)}")
-            return response.json(500, "获取方案详情失败")
+            return response.json(500, "获取方案详情失败")
+
+    @staticmethod
+    def get_order_data(userID: int,
+                       request_dict: QueryDict, response: ResponseObject) -> JsonResponse:
+        """
+                           获取设备入库订单统计数据
+
+                           Args:
+                               userID (int): 用户ID
+                               request (HttpRequest): 请求对象
+                               request_dict (QueryDict): 请求参数字典
+                               response (ResponseObject): 响应对象
+
+                           Returns:
+                               JsonResponse: 包含数据汇总和按月份或订单号明细的统计数据
+                           """
+        try:
+            # 获取参数
+            start_time = request_dict.get('start_time')
+            end_time = request_dict.get('end_time')
+            order_number = request_dict.get('order_number')
+
+            product_filters = {}
+
+            # 时间戳校验
+            if start_time is not None and end_time is not None:
+                try:
+                    product_filters['created_time__gte'] = int(start_time)
+                    product_filters['created_time__lte'] = int(end_time)
+                except ValueError:
+                    return response.json(400, '无效的时间戳')
+
+            # 查询产品订单
+            products_schemes_qs = ProductsScheme.objects.filter(**product_filters).exclude(deleted=1)
+
+            # 所有 storage_code
+            storage_codes = list(products_schemes_qs.values_list('storage_code', flat=True))
+
+            # 查设备
+            device_queryset = DeviceScheme.objects.filter(storage_code__in=storage_codes)
+
+            # 设备序列号前缀
+            serial_prefixes = device_queryset.annotate(
+                prefix=Substr('serial_number', 1, 6)
+            ).values_list('prefix', flat=True)
+
+            # 激活序列号前缀
+            activated_prefixes_qs = CompanySerialModel.objects.filter(
+                serial_number__in=serial_prefixes,
+                status__gt=1
+            ).values_list('serial_number', flat=True)
+
+            activated_prefixes = set(activated_prefixes_qs)
+
+            if order_number:
+                product_scheme = products_schemes_qs.filter(order_number=order_number).first()
+
+                if not product_scheme:
+                    return response.json(0, {})
+
+                order_devices_qs = DeviceScheme.objects.filter(storage_code=product_scheme.storage_code)
+
+                # 设备总数
+                devices_in_stock = order_devices_qs.count()
+
+                # 激活数量
+                order_prefixes = order_devices_qs.annotate(
+                    prefix=Substr('serial_number', 1, 6)
+                ).values_list('prefix', flat=True)
+
+                devices_activated = CompanySerialModel.objects.filter(
+                    serial_number__in=order_prefixes,
+                    status__gt=1
+                ).count()
+
+                return response.json(0, {
+                    'order_number': product_scheme.order_number,
+                    'devices_in_stock': devices_in_stock,
+                    'devices_activated': devices_activated,
+                    'devices_inactive': devices_in_stock - devices_activated
+                })
+            else:
+                # 查询汇总数据
+                total_orders = products_schemes_qs.values('order_number').distinct().count()
+                total_devices_in_stock = device_queryset.count()
+
+                # 总激活数量
+                total_activated_devices = CompanySerialModel.objects.filter(
+                    serial_number__in=serial_prefixes,
+                    status__gt=1
+                ).count()
+
+                return response.json(0, {
+                    'total_orders': total_orders,
+                    'total_devices_in_stock': total_devices_in_stock,
+                    'total_activated_devices': total_activated_devices,
+                    'total_inactivated_devices': total_devices_in_stock - total_activated_devices
+                })
+
+        except Exception as e:
+            error_msg: str = f'查询设备入库数据异常 - 用户: {userID}, 错误: {str(e)}'
+            error_line: int = e.__traceback__.tb_lineno
+            LOGGER.error(f'{error_msg} 行号: {error_line}')
+            return response.json(500)
+
+    @staticmethod
+    def get_order_list(userID: int,
+                       request_dict: QueryDict, response: ResponseObject) -> JsonResponse:
+        """
+               获取设备订单数据
+
+               Args:
+                   userID (int): 用户ID
+                   request (HttpRequest): 请求对象
+                   request_dict (QueryDict): 请求参数字典
+                   response (ResponseObject): 响应对象
+
+               Returns:
+                   JsonResponse: 包含订单列表数据汇总
+               """
+        try:
+            order_list = list(
+                ProductsScheme.objects.values_list('order_number', flat=True).exclude(deleted=1).distinct()
+            )
+
+            return response.json(0, order_list)
+        except Exception as e:
+            error_msg: str = f'查询设备订单列表异常 - 用户: {userID}, 错误: {str(e)}'
+            error_line: int = e.__traceback__.tb_lineno
+            LOGGER.error(f'{error_msg} 行号: {error_line}')
+            return response.json(500)

+ 228 - 73
Controller/Cron/CronTaskController.py

@@ -46,8 +46,15 @@ from Object.WechatPayObject import WechatPayObject
 from Object.AliPayObject import AliPayObject
 from dateutil.relativedelta import relativedelta
 from django.conf import settings
+import datetime as dt
+from Ansjer.config import LOGGER
+from Object.AliOssUtil import AliOssUtil
+
 ACCESS_KEY_ID = settings.ACCESS_KEY_ID
 SECRET_ACCESS_KEY = settings.SECRET_ACCESS_KEY
+# aliyun
+ALICLOUD_AK = settings.ALICLOUD_AK
+ALICLOUD_SK = settings.ALICLOUD_SK
 
 
 class CronDelDataView(View):
@@ -1298,12 +1305,12 @@ class CronCollectDataView(View):
     @staticmethod
     def collect_operating_costs(response):
         try:
-            today = datetime.datetime.today()
-            end_time = datetime.datetime(today.year, today.month, today.day)
-            yesterday = end_time - datetime.timedelta(days=1)
-            start_time = datetime.datetime(yesterday.year, yesterday.month, 1)
-            start_time_stamp = int(start_time.timestamp())
-            end_time_stamp = int(end_time.timestamp())
+            today = datetime.datetime.today()  # 获取当前日期时间(如2025-11-27 10:00:00)
+            end_time = datetime.datetime(today.year, today.month, today.day)  # 今日凌晨0点(如2025-11-27 00:00:00)
+            yesterday = end_time - datetime.timedelta(days=1)  # 昨日凌晨0点(如2025-11-26 00:00:00)
+            start_time = datetime.datetime(yesterday.year, yesterday.month, 1)  # 当月第一天凌晨(如2025-11-01 00:00:00)
+            start_time_stamp = int(start_time.timestamp())  # 当月第一天时间戳(整数,用于数据筛选)
+            end_time_stamp = int(end_time.timestamp())  # 今日凌晨时间戳(整数,用于数据筛选)
             thread = threading.Thread(target=CronCollectDataView.thread_collect_operating_costs,
                                       args=(start_time_stamp, end_time_stamp, start_time, end_time))
             thread.start()  # 启动线程
@@ -1317,17 +1324,17 @@ class CronCollectDataView(View):
             create_time = int(time.time())
             today_end_time = end_time_stamp + 86400
             operating_costs_qs_1 = OperatingCosts.objects.filter(time=start_time_stamp).exclude(
-                created_time__gte=end_time_stamp, created_time__lt=today_end_time).values('order_id', 'end_time', 'uid')
+                created_time__gte=end_time_stamp, created_time__lt=today_end_time).values('order_id', 'end_time', 'uid',
+                                                                                          'vod_location')
             operating_costs_qs_2 = OperatingCosts.objects.filter(time=start_time_stamp,
                                                                  created_time__gte=end_time_stamp,
                                                                  created_time__lt=today_end_time, start_time=0).values(
-                'order_id', 'end_time', 'uid')
+                'order_id', 'end_time', 'uid', 'vod_location')
             operating_costs_qs = operating_costs_qs_1.union(operating_costs_qs_2)
-            storage_univalence = 0.023 / 30
-            api_univalence = 0.005 / 1000
             region = '国内' if CONFIG_INFO == CONFIG_CN else '国外'
             country_qs = CountryModel.objects.values('id', 'country_name')
             country_dict = {}
+            redis_obj = RedisObject()
             for item in country_qs:
                 country_dict[item['id']] = item['country_name']
             for item in operating_costs_qs:
@@ -1350,91 +1357,215 @@ class CronCollectDataView(View):
                     order_start_time = int((datetime.datetime.fromtimestamp(item['end_time']) - relativedelta(
                         months=order['rank__expire'])).timestamp())
                     order_days = math.ceil((item['end_time'] - order_start_time) / 86400)
+                    # 计算结算天数和查询范围
                     if item['end_time'] > end_time_stamp:  # 订单结束时间大于统计时间
                         if order_start_time <= start_time_stamp:  # 订单月初之前开始
-                            settlement_days = (end_time - start_time).days  # 当月结算天数=月初-月底
-                            uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
-                                                                                       time__lte=end_time_stamp,
-                                                                                       uid=item['uid'])
+                            settlement_days = (end_time - start_time).days
+                            query_start = start_time_stamp
+                            query_end = end_time_stamp
                         elif order_start_time >= end_time_stamp:  # 订单在统计时间之后开始
                             settlement_days = 1
-                            uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=end_time_stamp,
-                                                                                       time__lt=order_start_time,
-                                                                                       uid=item['uid'])
+                            query_start = end_time_stamp
+                            query_end = order_start_time
                         else:  # 订单月初和统计时间之间开始
                             settlement_days = math.ceil((end_time_stamp - order_start_time) / 86400)
-                            uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=order_start_time,
-                                                                                       time__lte=end_time_stamp,
-                                                                                       uid=item['uid'])
-                        remaining_usage_time = math.ceil((item['end_time'] - end_time_stamp) / 86400)  # 剩余使用时间
+                            query_start = order_start_time
+                            query_end = end_time_stamp
+                        remaining_usage_time = math.ceil((item['end_time'] - end_time_stamp) / 86400)
                     else:  # 订单结束时间小于统计时间
                         if order_start_time <= start_time_stamp:
                             settlement_days = math.ceil((item['end_time'] - start_time_stamp) / 86400)
-                            uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
-                                                                                       time__lt=item['end_time'],
-                                                                                       uid=item['uid'])
+                            query_start = start_time_stamp
+                            query_end = item['end_time']
                         else:
                             settlement_days = math.ceil((item['end_time'] - order_start_time) / 86400)
-                            uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=order_start_time,
-                                                                                       time__lt=item['end_time'],
-                                                                                       uid=item['uid'])
+                            query_start = order_start_time
+                            query_end = item['end_time']
                         remaining_usage_time = 0
-                    day_average_price = round(price / order_days, 2)  # 收入分摊/天
-                    month_average_price = round(day_average_price * settlement_days, 2)  # 收入分摊/月
-                    monthly_income = round((price - fee) / order_days * settlement_days, 2)  # 当月结算收入
+
+                    # 获取统计周期内的所有日统计记录
+                    uid_bucket_statistics = UidBucketStatistics.objects.filter(
+                        time__gte=query_start,
+                        time__lt=query_end,
+                        uid=item['uid']
+                    ).order_by('time')
+
+                    day_average_price = round(price / order_days, 2)
+                    month_average_price = round(day_average_price * settlement_days, 2)
+                    monthly_income = round((price - fee) / order_days * settlement_days, 2)
                     real_income = round(price - fee, 2)
-                    result = uid_bucket_statistics.aggregate(size=Sum('storage_size'), api_count=Sum('api_count'))
-                    actual_storage = round(result['size'], 2) if result['size'] else 0
-                    actual_api = result['api_count'] if result['api_count'] else 0
-                    storage_cost = actual_storage / 1024 * storage_univalence * settlement_days
-                    api_cost = actual_api * api_univalence
-                    if CONFIG_INFO == CONFIG_CN:  # 国内要换算汇率
+
+                    # 根据云存位置选择不同的成本计算方式
+                    if item['vod_location'] == 0:  # 阿里云OSS
+                        try:
+                            yesterday_stamp = end_time_stamp - 86400
+                            uid_bucket_statistics = uid_bucket_statistics.filter(uid=item['uid'],
+                                                                                 time__gte=yesterday_stamp,
+                                                                                 time__lt=end_time_stamp)
+                            storage_cost, api_cost = CronCollectDataView._calculate_aliyun_costs(
+                                uid_bucket_statistics, redis_obj
+                            )
+                            ali_costs_qs = OperatingCosts.objects.filter(
+                                time=start_time_stamp,
+                                order_id=item['order_id'],
+                                uid=item['uid']
+                            ).values('storage_cost', 'api_cost')
+                            if ali_costs_qs.exists():
+                                ali_costs = ali_costs_qs[0]
+                                storage_cost += float(ali_costs['storage_cost'])
+                                api_cost += float(ali_costs['api_cost'])
+                        except Exception as e:
+                            LOGGER.error(f'统计阿里云云存成本异常error_line:{e.__traceback__.tb_lineno}, error_msg:{str(e)}')
+                            continue
+
+                    else:  # AWS S3
+                        storage_cost, api_cost = CronCollectDataView._calculate_aws_costs(
+                            uid_bucket_statistics, settlement_days
+                        )
+
+                    # 汇率转换(仅国内)
+                    if CONFIG_INFO == CONFIG_CN and item['vod_location'] == 0:
                         storage_cost = storage_cost * 7
                         api_cost = api_cost * 7
-                    profit = round(monthly_income - storage_cost - api_cost, 2)  # 利润=月结算金额-月成本
+
+                    profit = round(monthly_income - storage_cost - api_cost, 2)
                     storage_cost = round(storage_cost, 2)
                     api_cost = round(api_cost, 2)
+
                     if monthly_income == 0.0:
                         profit_margin = 0
                     else:
-                        profit_margin = round(profit / month_average_price, 2)  # 利润率=利润/每月收入分摊
-                    OperatingCosts.objects.filter(time=start_time_stamp, order_id=item['order_id'],
-                                                  uid=item['uid']).update(day_average_price=day_average_price,
-                                                                          month_average_price=month_average_price,
-                                                                          monthly_income=monthly_income,
-                                                                          actual_storage=actual_storage,
-                                                                          settlement_days=settlement_days,
-                                                                          actual_api=actual_api, fee=fee,
-                                                                          created_time=create_time, region=region,
-                                                                          start_time=order_start_time,
-                                                                          remaining_usage_time=remaining_usage_time,
-                                                                          storage_cost=storage_cost, api_cost=api_cost,
-                                                                          profit=profit, profit_margin=profit_margin,
-                                                                          real_income=real_income, price=price,
-                                                                          country_name=country_name,
-                                                                          order_type=order_type, expire=expire)
+                        profit_margin = round(profit / month_average_price, 2)
+
+                    # 更新运营成本记录
+                    OperatingCosts.objects.filter(
+                        time=start_time_stamp,
+                        order_id=item['order_id'],
+                        uid=item['uid']
+                    ).update(
+                        day_average_price=day_average_price,
+                        month_average_price=month_average_price,
+                        monthly_income=monthly_income,
+                        actual_storage=round(sum(float(stat.storage_size) for stat in uid_bucket_statistics), 2),
+                        settlement_days=settlement_days,
+                        actual_api=sum(stat.api_count for stat in uid_bucket_statistics),
+                        fee=fee,
+                        created_time=create_time,
+                        region=region,
+                        start_time=order_start_time,
+                        remaining_usage_time=remaining_usage_time,
+                        storage_cost=storage_cost,
+                        api_cost=api_cost,
+                        profit=profit,
+                        profit_margin=profit_margin,
+                        real_income=real_income,
+                        price=price,
+                        country_name=country_name,
+                        order_type=order_type,
+                        expire=expire
+                    )
             print('结束')
         except Exception as e:
             LOGGER.info(
                 'thread_collect_operating_costs接口异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno,
-                                                                                      repr(e)))
+                                                                                      repr(e))
+            )
+
+    @classmethod
+    def _calculate_aws_costs(cls, uid_bucket_statistics, settlement_days):
+        """计算AWS S3成本"""
+        aws_storage_univalence = 0.023 / 30
+        aws_api_univalence = 0.005 / 1000
+
+        result = uid_bucket_statistics.aggregate(
+            size=Sum('storage_size'),
+            api_count=Sum('api_count')
+        )
+
+        total_storage = float(result['size']) if result['size'] else 0
+        total_api = result['api_count'] if result['api_count'] else 0
+
+        # AWS按天计费
+        storage_cost = total_storage / 1024 * aws_storage_univalence * settlement_days
+        api_cost = total_api * aws_api_univalence
+
+        return storage_cost, api_cost
+
+    @classmethod
+    def _calculate_aliyun_costs(cls, uid_bucket_statistics, redis_obj):
+        """
+        计算阿里云OSS存储及API调用成本
+        该方法通过聚合指定时间范围内的OSS存储用量和API调用次数,结合阿里云官方定价标准,
+        计算实际产生的存储成本与API调用成本,并支持结果缓存避免重复计算。
+
+        Args:
+            uid_bucket_statistics (QuerySet): OSS用量统计数据集,包含存储大小、API次数、时间戳等字段
+            redis_obj (RedisClient): Redis客户端实例,用于缓存计算结果避免重复统计
+        Returns:
+            Tuple[Decimal, Decimal]:
+                - 存储成本(元,应用折扣后)
+                - API调用成本(元,应用折扣后)
+        """
+        aliyun_storage_univalence = 0.0001666667  # GB/小时
+        aliyun_api_univalence = 0.01 / 10000  # 元/次
+        discount_rate = 0.48  # 4.8折
+
+        total_storage_cost = 0
+        total_api_cost = 0
+
+        # 按日期聚合数据
+        date_stats = {}
+        cost_stat = uid_bucket_statistics.first()
+        cost_key = f'ali:costs:uid:{cost_stat.uid}:time:{cost_stat.time}'
+        if redis_obj.get_data(cost_key):
+            raise TypeError("已统计过")
+        for stat in uid_bucket_statistics:
+            date = dt.datetime.fromtimestamp(stat.time).date()
+            if date not in date_stats:
+                date_stats[date] = {'storage': 0, 'api': 0}
+            date_stats[date]['storage'] += float(stat.storage_size)
+            date_stats[date]['api'] += stat.api_count
+
+        # 计算每日成本
+        for date, stats in date_stats.items():
+            # 计算当日上传的存储成本(直接计算7天费用)
+            daily_storage_gb = stats['storage'] / 1024  # 转换为GB
+
+            # 存储成本 = 当日上传大小 × 单价 × 24小时 × 7天
+            daily_storage_cost = daily_storage_gb * aliyun_storage_univalence * 24 * 7
+
+            # API成本 = API次数 × 单价(API没有7天概念,按实际调用次数)
+            daily_api_cost = stats['api'] * aliyun_api_univalence
+
+            total_storage_cost += daily_storage_cost
+            total_api_cost += daily_api_cost
+
+        redis_obj.set_data(cost_key, json.dumps(str(total_api_cost)), expire=3600 * 12)
+
+        # 应用折扣
+        total_storage_cost_discounted = total_storage_cost * discount_rate
+        total_api_cost_discounted = total_api_cost * discount_rate
+
+        return total_storage_cost_discounted, total_api_cost_discounted
+
 
     @staticmethod
     def collect_obj_size(response):
         try:
-            today = datetime.datetime.today()
-            end_time = datetime.datetime(today.year, today.month, today.day)
-            start_time = end_time - datetime.timedelta(days=1)
-            first_date = datetime.datetime(start_time.year, start_time.month, 1)
-            start_time_stamp = int(start_time.timestamp())
-            end_time_stamp = int(end_time.timestamp())
-            first_date_stamp = int(first_date.timestamp())
+            today = dt.datetime.today()  # 获取当前日期时间(如2025-11-26 10:00:00)
+            end_time = dt.datetime(today.year, today.month, today.day)  # 今日凌晨0点(如2025-11-26 00:00:00)
+            start_time = end_time - dt.timedelta(days=1)  # 昨日凌晨0点(如2025-11-25 00:00:00)
+            first_date = dt.datetime(start_time.year, start_time.month, 1)  # 当月第一天凌晨(如2025-11-01 00:00:00)
+            start_time_stamp = int(start_time.timestamp())  # 昨日0点时间戳(用于筛选数据)
+            end_time_stamp = int(end_time.timestamp())  # 今日0点时间戳(用于筛选数据)
+            first_date_stamp = int(first_date.timestamp())  # 当月第一天时间戳(用于月度统计)
             thread = threading.Thread(target=CronCollectDataView.thread_collect_obj_size,
                                       args=(start_time_stamp, end_time_stamp, first_date_stamp))
             thread.start()  # 启动线程
 
             return response.json(0)
         except Exception as e:
+            LOGGER.error(f'collect_obj_size接口异常:errLine:{e.__traceback__.tb_lineno}, errMsg:{repr(e)}')
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
@@ -1446,20 +1577,44 @@ class CronCollectDataView(View):
                                                                                                          'bucket__bucket',
                                                                                                          'orderId',
                                                                                                          'channel',
-                                                                                                         'endTime')
+                                                                                                         'endTime',
+                                                                                                         'bucket__vod_location',
+                                                                                                         'bucket__region')
+
             s3_obj = AmazonS3Util(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME)
+
             for item in uid_vod:
                 path = '{uid}/vod{channel}'.format(uid=item['uid'], channel=item['channel'])
-                s3_result = s3_obj.get_object_list(item['bucket__bucket'], path,
-                                                   path + '/{}'.format(start_time), end_time)
                 actual_storage = 0
                 actual_api = 0
-                for obj in s3_result:
-                    temp_time = int(obj['Key'].split('/')[2])
-                    if temp_time < end_time:
-                        actual_storage += obj['Size']
-                        actual_api += 1
-                actual_storage = round(actual_storage / 1024 / 1024, 2)
+                if item['bucket__vod_location'] == 1:  # 阿里云
+                    # 初始化OSS工具类
+                    oss_obj = AliOssUtil(
+                        ALICLOUD_AK,
+                        ALICLOUD_SK,
+                        f"https://oss-{item['bucket__region']}.aliyuncs.com"
+                    )
+                    # 获取所有文件并计算总大小
+                    result = oss_obj.list_all_objects(
+                        bucket_name=item['bucket__bucket'],
+                        prefix=path,
+                        start_after=f'{path}/{start_time}',
+                        max_keys=1000,
+                        end_time=end_time
+                    )
+                    if result:
+                        actual_storage = result['total_size_mb']
+                        actual_api = result['total_files']
+                else:
+                    s3_result = s3_obj.get_object_list(item['bucket__bucket'], path,
+                                                       path + '/{}'.format(start_time), end_time)
+
+                    for obj in s3_result:
+                        temp_time = int(obj['Key'].split('/')[2])
+                        if temp_time < end_time:
+                            actual_storage += obj['Size']
+                            actual_api += 1
+                    actual_storage = round(actual_storage / 1024 / 1024, 2)
                 with transaction.atomic():
                     if actual_api:
                         UidBucketStatistics.objects.create(uid=item['uid'], storage_size=actual_storage,
@@ -1471,9 +1626,9 @@ class CronCollectDataView(View):
                     if not operating_costs_qs.exists():
                         OperatingCosts.objects.create(order_id=item['orderId'], uid=item['uid'],
                                                       created_time=creat_time, time=first_date,
-                                                      end_time=item['endTime'])
-                print(actual_storage, actual_api)
-            print('结束')
+                                                      end_time=item['endTime'], vod_location=item['bucket__vod_location'])
+                LOGGER.info(f"uid:{item['uid']} 大小:{actual_storage} api次数:{actual_api}")
+            LOGGER.info('结束')
         except Exception as e:
             LOGGER.info('统计s3信息异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 

+ 1 - 0
Model/models.py

@@ -4290,6 +4290,7 @@ class OperatingCosts(models.Model):
     country_name = models.CharField(verbose_name='国家', default='', max_length=16)
     order_type = models.CharField(verbose_name='订单类型', default='', max_length=16)
     expire = models.CharField(verbose_name='订单时长', default='', max_length=16)
+    vod_location = models.SmallIntegerField(default=0, verbose_name='云存位置')
 
     def __str__(self):
         return self.id

+ 159 - 2
Object/AliOssUtil.py

@@ -1,6 +1,11 @@
 import traceback
+from decimal import Decimal, ROUND_HALF_UP
+from typing import Optional, Dict
+
 import oss2
 
+from Ansjer.config import LOGGER
+
 
 class AliOssUtil:
     def __init__(self, access_key_id, access_key_secret, endpoint):
@@ -8,7 +13,7 @@ class AliOssUtil:
         self.access_secret = access_key_secret
         self.endpoint = endpoint
         self.auth = oss2.Auth(access_key_id, access_key_secret)
-        
+
     def upload_file_obj(self, bucket_name, file_key, file_obj, extra_args=None):
         """
         对象上传至OSS存储桶
@@ -76,4 +81,156 @@ class AliOssUtil:
             print(e.args)
             ex = traceback.format_exc()
             print('具体错误{}'.format(ex))
-            return False
+            return False
+
+    def list_objects(self, bucket_name: str, prefix: str = '', delimiter: str = '',
+                     continuation_token: str = '', start_after: str = '',
+                     fetch_owner: bool = False, encoding_type: str = 'url',
+                     max_keys: int = 1000, headers: Optional[dict] = None) -> Optional[dict]:
+        """
+        根据前缀罗列Bucket里的文件。
+
+        Args:
+            bucket_name (str): 存储桶名称。
+            prefix (str): 只罗列文件名为该前缀的文件。
+            delimiter (str): 分隔符,可以用来模拟目录。
+            continuation_token (str): 分页标志。首次调用传空串,后续使用返回值的next_continuation_token。
+            start_after (str): 起始文件名称,OSS会返回按照字典序排列start_after之后的文件。
+            fetch_owner (bool): 是否获取文件的owner信息,默认不返回。
+            encoding_type (str): 编码类型。
+            max_keys (int): 最多返回文件的个数,文件和目录的和不能超过该值。
+            headers (Optional[dict]): HTTP头部。
+
+        Returns:
+            Optional[dict]: 包含文件列表和分页信息的字典,或None(发生错误时)。
+                - files (List[str]): 文件列表。
+                - prefixes (List[str]): 目录前缀列表。
+                - next_continuation_token (str): 分页标志。
+                - is_truncated (bool): 是否还有更多结果。
+        """
+        try:
+            bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
+            result = bucket.list_objects_v2(
+                prefix=prefix,
+                delimiter=delimiter,
+                continuation_token=continuation_token,
+                start_after=start_after,
+                fetch_owner=fetch_owner,
+                encoding_type=encoding_type,
+                max_keys=max_keys,
+                headers=headers
+            )
+            return {
+                'files': result.object_list,
+                'prefixes': result.prefix_list,
+                'next_continuation_token': result.next_continuation_token,
+                'is_truncated': result.is_truncated
+            }
+        except Exception as e:
+            LOGGER.error('查询oss文件列表失败, :{},:errLine:{}, errMsg:{}'
+                         .format(bucket_name, e.__traceback__.tb_lineno, repr(e)))
+            raise RuntimeError(f"Failed to list objects in bucket {bucket_name}: {str(e)}")
+
+    def list_all_objects(self, bucket_name: str, prefix: str = '', start_after: str = '',
+                         max_keys: int = 1000, end_time: Optional[int] = None,
+                         include_files: bool = True) -> Dict:
+        """
+        递归获取所有分页的文件(封装分页逻辑,对外提供简洁接口),支持按10位时间戳截止时间过滤
+
+        Args:
+            bucket_name (str): 存储桶名称
+            prefix (str): 文件前缀
+            start_after (str): 起始文件名
+            max_keys (int): 每页最大数量
+            end_time (int): 截止时间(10位整数时间戳,仅统计 last_modified < end_time 的文件)
+            include_files (bool): 是否包含文件列表(大数据量时建议设为False)
+
+        Returns:
+            Dict:
+                - total_size_bytes (int): 文件总大小(字节)
+                - total_files (int): 文件总数
+                - total_size_mb (float): 总大小(MB,保留4位小数)
+                - files (List[oss2.models.ObjectInfo]): 所有文件列表(仅当include_files=True时返回)
+        """
+        try:
+            total_size = 0
+            total_files = 0
+            continuation_token = ''
+            all_files = [] if include_files else None
+
+            # 预计算常量
+            MB_BYTES = 1024 * 1024
+
+            while True:
+                result = self.list_objects(
+                    bucket_name=bucket_name,
+                    prefix=prefix,
+                    start_after=start_after,
+                    continuation_token=continuation_token,
+                    max_keys=max_keys
+                )
+
+                current_files = result.get('files', [])
+
+                # 时间过滤逻辑优化
+                if end_time is not None:
+                    should_break = False
+                    filtered_files = []
+
+                    for obj in current_files:
+                        if obj.last_modified < end_time:
+                            filtered_files.append(obj)
+                        else:
+                            # 遇到超过截止时间的文件,标记终止并跳出循环
+                            should_break = True
+                            break
+
+                    current_files = filtered_files
+
+                    # 如果当前页有文件超过截止时间,提前终止
+                    if should_break:
+                        # 累加当前过滤后的文件
+                        total_size += sum(obj.size for obj in current_files)
+                        total_files += len(current_files)
+                        if include_files:
+                            all_files.extend(current_files)
+                        break
+
+                # 使用生成器表达式减少内存使用
+                batch_size = sum(obj.size for obj in current_files)
+                batch_files_count = len(current_files)
+
+                total_size += batch_size
+                total_files += batch_files_count
+
+                if include_files:
+                    all_files.extend(current_files)
+
+                # 终止循环条件优化
+                is_truncated = result.get('is_truncated', False)
+                next_token = result.get('next_continuation_token', '')
+
+                if not is_truncated or (end_time is not None and not next_token):
+                    break
+
+                continuation_token = next_token
+
+            # 字节转MB优化计算
+            total_size_mb = Decimal(total_size) / Decimal(MB_BYTES)
+            total_size_mb = total_size_mb.quantize(Decimal('0.00'), rounding=ROUND_HALF_UP)
+
+            # 构建返回结果
+            result_data = {
+                'total_size_bytes': total_size,
+                'total_files': total_files,
+                'total_size_mb': float(total_size_mb)
+            }
+
+            if include_files:
+                result_data['files'] = all_files
+
+            return result_data
+        except Exception as e:
+            error_msg = str(e)
+            LOGGER.error(f"{prefix} 查询oss文件列表失败, errMsg: {error_msg}")
+            return {}