Эх сурвалжийг харах

每日统计储存量
每月统计运营利润和成本

peng 1 жил өмнө
parent
commit
a1be14437d

+ 119 - 57
Controller/Cron/CronTaskController.py

@@ -29,7 +29,7 @@ from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unus
     CountryModel, DeviceTypeModel, OrdersSummary, DeviceInfoSummary, CompanySerialModel, \
     CountryModel, DeviceTypeModel, OrdersSummary, DeviceInfoSummary, CompanySerialModel, \
     CloudLogModel, UidCloudStorageCount, UserExModel, DeviceDomainRegionModel, VodHlsTag, VodHlsTagType, IcloudService, \
     CloudLogModel, UidCloudStorageCount, UserExModel, DeviceDomainRegionModel, VodHlsTag, VodHlsTagType, IcloudService, \
     Store_Meal, Lang, VodBucketModel, UnicomComboOrderInfo, UnicomDeviceInfo, AbnormalOrder, DailyReconciliation, \
     Store_Meal, Lang, VodBucketModel, UnicomComboOrderInfo, UnicomDeviceInfo, AbnormalOrder, DailyReconciliation, \
-    CustomizedPush, UIDCompanySerialModel, UIDModel, LogModel, OperatingCosts
+    CustomizedPush, UIDCompanySerialModel, UIDModel, LogModel, OperatingCosts, UidBucketStatistics
 from Object.AWS.AmazonS3Util import AmazonS3Util
 from Object.AWS.AmazonS3Util import AmazonS3Util
 from Object.RedisObject import RedisObject
 from Object.RedisObject import RedisObject
 from Object.ResponseObject import ResponseObject
 from Object.ResponseObject import ResponseObject
@@ -800,6 +800,8 @@ class CronCollectDataView(View):
             return self.collect_flow_info(response)
             return self.collect_flow_info(response)
         elif operation == 'collectOperatingCosts':  # 定时运营成本
         elif operation == 'collectOperatingCosts':  # 定时运营成本
             return self.collect_operating_costs(response)
             return self.collect_operating_costs(response)
+        elif operation == 'collectObjSize':  # 定时设备s3存储量
+            return self.collect_obj_size(response)
         else:
         else:
             return response.json(404)
             return response.json(404)
 
 
@@ -1129,70 +1131,130 @@ class CronCollectDataView(View):
     @staticmethod
     @staticmethod
     def collect_operating_costs(response):
     def collect_operating_costs(response):
         try:
         try:
-            created_time = int(time.time())
             today = datetime.datetime.today()
             today = datetime.datetime.today()
-            end_time = datetime.datetime(today.year, today.month, 1)
-            start_time = end_time - relativedelta(months=1)
+            start_time = datetime.datetime(today.year, today.month, 1)
+            end_time = start_time + relativedelta(months=1)
             start_time_stamp = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
             start_time_stamp = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
             end_time_stamp = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
             end_time_stamp = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
-            order_list = UID_Bucket.objects.filter(endTime__gte=start_time_stamp).values_list('orderId', flat=True)
-            order_qs = Order_Model.objects.filter(orderID__in=order_list).values('trade_no', 'price', 'UID', 'payTime',
-                                                                                 'order_type', 'rank__expire',
-                                                                                 'payType', 'uid_bucket_id', 'channel',
-                                                                                 'rank__bucket__bucket',
-                                                                                 'orderID')
-            s3_obj = AmazonS3Util(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME)
-            paypal_api = paypalrestsdk.Api(PAYPAL_CRD)
-            for order in order_qs:
-                price = float(order['price'])
-                if order['payType'] == 1:  # paypal支付查询手续费
-                    paypal_url = 'v1/reporting/transactions?start_date={}-{}-{}T00:00:00-0000&end_date={}-{}-{}T00:00:00-0000&transaction_id={}&fields=all&page_size=100&page=1'.format(
-                        start_time.year, start_time.month, start_time.day, end_time.year, end_time.month, end_time.day,
-                        order['trade_no'])
-                    paypal_order_list = paypal_api.get(paypal_url)
-                    fee = paypal_order_list
-                else:
-                    fee = 0
-                bucket_qs = UID_Bucket.objects.filter(id=order['uid_bucket_id']).values('endTime')
-                if bucket_qs[0]['endTime'] > end_time_stamp:  # 当月结算天数
-                    settlement_days = (end_time - start_time).days
-                else:
-                    settlement_days = (bucket_qs[0]['endTime'] - start_time_stamp) / 86400
-                day_average_price = price / ((bucket_qs[0]['endTime'] - order['payTime']) / 86400)  # 收入分摊/天
-                month_average_price = day_average_price * settlement_days  # 收入分摊/月
-                monthly_income = (price - fee) / settlement_days  # 当月结算收入
-                remaining_usage_time = (bucket_qs[0]['endTime'] - end_time_stamp) / 86400  # 剩余使用时间
-                # path = '{uid}/vod{channel}'.format(uid=order['uid'], channel=order['channel'])
-                # s3_result = s3_obj.get_object_list(order['rank__bucket__bucket'], path,
-                #                                    path + '/{}'.format(start_time_stamp))
-                # actual_storage = 0
-                # actual_api = 0
-                # for item in s3_result:
-                #     temp_time = int(item['Key'].split('/')['2'])
-                #     if temp_time < end_time_stamp:
-                #         actual_storage += item['Size']
-                #         actual_api += 1
-                with transaction.atomic():
-                    operating_costs_qs = OperatingCosts.objects.filter(order_id=order['orderID'], time=start_time)
-                    if operating_costs_qs.exists():
-                        operating_costs_qs.update(fee=fee, day_average_price=day_average_price,
-                                                  month_average_price=month_average_price,
-                                                  monthly_income=monthly_income,
-                                                  settlement_days=settlement_days,
-                                                  remaining_usage_time=remaining_usage_time)
+            thread = threading.Thread(target=CronCollectDataView.thread_collect_operating_costs,
+                                      args=(start_time_stamp, end_time_stamp, start_time, end_time))
+            thread.start()  # 启动线程
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def thread_collect_operating_costs(start_time_stamp, end_time_stamp, start_time, end_time):
+        try:
+            operating_costs_qs = OperatingCosts.objects.filter(time=start_time_stamp).values('order_id', 'end_time',
+                                                                                             'uid')
+            for item in operating_costs_qs:
+                order_qs = Order_Model.objects.filter(orderID=item['order_id'], UID=item['uid']).values('price',
+                                                                                                        'payTime',
+                                                                                                        'order_type',
+                                                                                                        'rank__expire',
+                                                                                                        'fee',
+                                                                                                        'payType')
+                if order_qs.exists():
+                    order = order_qs[0]
+                    if order['order_type'] not in [0, 1]:
+                        continue
+                    price = float(order['price'])
+                    if order['payType'] in [2, 3]:
+                        fee = 0.0054 * price
+                    else:
+                        fee = float(order['fee']) if order['fee'] else 0
+                    if item['end_time'] > end_time_stamp:  # 当月结算天数
+                        settlement_days = (end_time - start_time).days
+                        remaining_usage_time = (item['end_time'] - end_time_stamp) // 86400  # 剩余使用时间
                     else:
                     else:
-                        OperatingCosts.objects.create(order_id=order['orderID'], fee=fee,
-                                                      day_average_price=day_average_price,
-                                                      month_average_price=month_average_price,
-                                                      monthly_income=monthly_income,
-                                                      settlement_days=settlement_days,
-                                                      remaining_usage_time=remaining_usage_time,
-                                                      created_time=created_time,
-                                                      time=start_time)
+                        settlement_days = (item['end_time'] - start_time_stamp) // 86400
+                        remaining_usage_time = 0
+                    day_average_price = round(price / (order['rank__expire'] * 30), 2)  # 收入分摊/天
+                    month_average_price = round(day_average_price * settlement_days, 2)  # 收入分摊/月
+                    monthly_income = round((price - fee) / (order['rank__expire'] * 30) * settlement_days, 2)  # 当月结算收入
+                    if item['end_time'] < end_time_stamp:
+                        uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
+                                                                                   time__lt=item['end_time'],
+                                                                                   uid=item['uid'])
+                    else:
+                        uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
+                                                                                   time__lt=end_time_stamp,
+                                                                                   uid=item['uid'])
+                    result = uid_bucket_statistics.aggregate(size=Sum('storage_size'), api_count=Sum('api_count'))
+                    actual_storage = round(result['size'], 2) if result['size'] else 0
+                    actual_api = result['api_count'] if result['api_count'] else 0
+                    OperatingCosts.objects.filter(time=start_time_stamp, order_id=item['order_id'],
+                                                  uid=item['uid']).update(day_average_price=day_average_price,
+                                                                          month_average_price=month_average_price,
+                                                                          monthly_income=monthly_income,
+                                                                          actual_storage=actual_storage,
+                                                                          settlement_days=settlement_days,
+                                                                          actual_api=actual_api,
+                                                                          remaining_usage_time=remaining_usage_time)
+            print('结束')
+        except Exception as e:
+            LOGGER.info(
+                'thread_collect_operating_costs接口异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def collect_obj_size(response):
+        try:
+            today = datetime.datetime.today()
+            end_time = datetime.datetime(today.year, today.month, today.day)
+            first_date = datetime.datetime(today.year, today.month, 1)
+            start_time = end_time - datetime.timedelta(days=1)
+            start_time_stamp = int(start_time.timestamp())
+            end_time_stamp = int(end_time.timestamp())
+            first_date_stamp = int(first_date.timestamp())
+            thread = threading.Thread(target=CronCollectDataView.thread_collect_obj_size,
+                                      args=(start_time_stamp, end_time_stamp, first_date_stamp))
+            thread.start()  # 启动线程
+
             return response.json(0)
             return response.json(0)
         except Exception as e:
         except Exception as e:
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
 
+    @staticmethod
+    def thread_collect_obj_size(start_time, end_time, first_date):
+        try:
+            creat_time = int(time.time())
+            uid_list = UidBucketStatistics.objects.filter(time=start_time).values_list('uid', flat=True)
+            uid_vod = UID_Bucket.objects.filter(Q(endTime__gte=start_time), ~Q(uid__in=uid_list)).values('uid',
+                                                                                                         'bucket__bucket',
+                                                                                                         'orderId',
+                                                                                                         'channel',
+                                                                                                         'endTime')
+            s3_obj = AmazonS3Util(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME)
+            for item in uid_vod:
+                path = '{uid}/vod{channel}'.format(uid=item['uid'], channel=item['channel'])
+                s3_result = s3_obj.get_object_list(item['bucket__bucket'], path,
+                                                   path + '/{}'.format(start_time), end_time)
+                actual_storage = 0
+                actual_api = 0
+                for obj in s3_result:
+                    temp_time = int(obj['Key'].split('/')[2])
+                    if temp_time < end_time:
+                        actual_storage += obj['Size']
+                        actual_api += 1
+                actual_storage = round(actual_storage / 1024 / 1024, 2)
+                with transaction.atomic():
+                    if actual_api:
+                        UidBucketStatistics.objects.create(uid=item['uid'], storage_size=actual_storage,
+                                                           api_count=actual_api,
+                                                           created_time=creat_time,
+                                                           time=start_time)
+                    operating_costs_qs = OperatingCosts.objects.filter(order_id=item['orderId'], uid=item['uid'],
+                                                                       time=first_date)
+                    if not operating_costs_qs.exists():
+                        OperatingCosts.objects.create(order_id=item['orderId'], uid=item['uid'],
+                                                      created_time=creat_time, time=first_date,
+                                                      end_time=item['endTime'])
+                print(actual_storage, actual_api)
+            print('结束')
+        except Exception as e:
+            LOGGER.info('统计s3信息异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
     @staticmethod
     @staticmethod
     def collect_icloud_order(response):
     def collect_icloud_order(response):
         try:
         try: