import time import pytz from datetime import datetime from datetime import time as Time from Model.models import AppAdvertiseCampaign, DeviceTypeModel, CountryModel, Device_User, OpenScreenCampaign from Ansjer.config import SERVER_TYPE from django.views import View from django.db.models import Prefetch from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject class AppCampaignView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request, request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request, request_dict, operation) def validation(self, request, request_dict, operation): language = request_dict.get('language', 'en') response = ResponseObject(language) tko = TokenObject(request.META.get('HTTP_AUTHORIZATION')) response.lang = tko.lang if tko.code != 0: return response.json(tko.code) userID = tko.userID if operation == 'appGetCampaigns': return self.app_get_campaigns(userID, request_dict, response) elif operation == 'recordUserBehavior': return self.record_user_behavior(userID, request_dict) else: return response.json(414) def app_get_campaigns(self, userID, request_dict, response): tz = request_dict.get('tz', '+0:00') app_type = request_dict.get('appType', 0) if SERVER_TYPE == 'Ansjer.cn_config.formal_settings' or SERVER_TYPE == 'Ansjer.cn_config.test_settings': s3_url = "https://ansjerfilemanager.s3.cn-northwest-1.amazonaws.com.cn/app/campaign/" else: s3_url = "https://ansjerfilemanager.s3.amazonaws.com/app/campaign/" # 当日时间戳区间获取 timezone = self.get_timezone_offset(tz) current_date = datetime.now().date() start_of_day = datetime.combine(current_date, Time.min) end_of_day = datetime.combine(current_date, Time.max) start_timestamp = int(start_of_day.timestamp()) end_timestamp = int(end_of_day.timestamp()) open_screen_campaign_qs = OpenScreenCampaign.objects.filter( user_id=userID, create_time__gt=start_timestamp, create_time__lt=end_timestamp ) if not open_screen_campaign_qs.exists(): OpenScreenCampaign.objects.create(user_id=userID, update_time=int(time.time()), create_time=int(time.time())) try: country_prefetch = Prefetch('country', queryset=CountryModel.objects.only('id'), to_attr='country_list') device_type_prefetch = Prefetch('device_type', queryset=DeviceTypeModel.objects.only('type'), to_attr='device_type_list') app_advertise_campaign_qs = AppAdvertiseCampaign.objects.prefetch_related(country_prefetch, device_type_prefetch) device_info_qs = Device_User.objects.filter(userID=userID).values("region_country").first() country_id = device_info_qs.get('region_country') if country_id != 0: app_advertise_campaign_qs = app_advertise_campaign_qs.filter(country__id=country_id, status=1) else: app_advertise_campaign_qs = app_advertise_campaign_qs.filter(status=1, unknown_country=1) if app_type is not None: app_advertise_campaign_qs = app_advertise_campaign_qs.filter(app_bundle_type=app_type) # 返回 广告名称、广告类型、开始时间、结束时间、广告图片、活动链接 campaigns_list = [] for campaign in app_advertise_campaign_qs: # campaign_end_date = (datetime.utcfromtimestamp(campaign.campaign_end_date) # 日期版本 # .replace(tzinfo=pytz.utc).astimezone(timezone).strftime('%Y-%m-%d')) # 日期范围 campaigns_start_date = datetime.fromtimestamp(campaign.campaign_start_date, pytz.utc).astimezone( timezone).replace(hour=0, minute=0, second=0, microsecond=0).timestamp() # 时间戳版本 campaign_end_date = datetime.fromtimestamp(campaign.campaign_end_date, pytz.utc).astimezone( timezone).replace(hour=0, minute=0, second=0, microsecond=0).timestamp() # 时间戳版本 # 时间范围 campaign_start_firstday = sum(int(x) * 60 ** i for i, x in enumerate(reversed( datetime.utcfromtimestamp(campaign.campaign_start_date + campaign.campaign_show_stime).replace( tzinfo=pytz.utc).astimezone(timezone).strftime('%H:%M').split(':')))) * 60 # 秒数版本 campaign_end_firstday = sum(int(x) * 60 ** i for i, x in enumerate(reversed( datetime.utcfromtimestamp(campaign.campaign_start_date + campaign.campaign_show_etime).replace( tzinfo=pytz.utc).astimezone(timezone).strftime('%H:%M').split(':')))) * 60 # 秒数版本 # campaign_end_firstday = ( # datetime.utcfromtimestamp(campaign.campaign_start_date + campaign.campaign_show_etime) # .replace(tzinfo=pytz.utc).astimezone(timezone).strftime('%H:%M')) # 时间版本 # 轮播广告 banner_campaign = [] for sort in campaign.banner_campaign: banner = campaign.banner_campaign[sort] banner["image"] = s3_url + banner["image"] banner_campaign.append(banner) campaigns_list.append({ 'campaign_id': campaign.id, 'image_url': s3_url + campaign.image_url, 'banner_campaign': banner_campaign, 'campaign_url': campaign.campaign_url, 'campaign_name': campaign.campaign_name, 'campaign_type': campaign.campaign_type, 'start_date': campaigns_start_date, 'end_date': campaign_end_date, 'start_time': campaign_start_firstday, 'end_time': campaign_end_firstday, 'device_types': [device.type for device in campaign.device_type_list], }) return response.json(0, { 'interval_time': 86400, 'campaigns': campaigns_list }) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def record_user_behavior(self, userID, request_dict, response): """ 记录用户行为 @param request_dict: 请求参数 @param response: 响应对象 @return: 响应对象 """ status = request_dict.get('status', None) # 1.未跳过 2.已跳过 3.点击广告 campaign_id = request_dict.get('campaign_id', None) user_id = userID if not all([status, campaign_id]): return response.json(444) # 当日时间戳区间获取 current_date = datetime.now().date() start_of_day = datetime.combine(current_date, Time.min) end_of_day = datetime.combine(current_date, Time.max) start_timestamp = int(start_of_day.timestamp()) end_timestamp = int(end_of_day.timestamp()) try: # 筛选符合条件的记录 open_screen_campaigns = OpenScreenCampaign.objects.filter( user_id=user_id, create_time__gt=start_timestamp, create_time__lt=end_timestamp, status=0, ) # 检查是否存在记录 if not open_screen_campaigns.exists(): # 如果不存在,则创建新记录 OpenScreenCampaign.objects.create( user_id=user_id, status=status, campaign_id_id=campaign_id, create_time=int(time.time()), update_time=int(time.time()) ) else: # 如果存在,则更新最新的记录 latest_campaign = open_screen_campaigns.latest("create_time") latest_campaign.status = status latest_campaign.campaign_id_id = campaign_id latest_campaign.update_time = int(time.time()) latest_campaign.save() return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def get_timezone_offset(self, tz): """ 将 "+8:00" 格式的时区字符串转换为包含分钟偏移的时区。 """ sign = tz[0] # " " 或 "-" hours, minutes = map(int, tz[1:].split('.')) # 分离小时和分钟 # 计算总分钟数 total_minutes = hours * 60 + minutes if sign == '-': total_minutes = -total_minutes # 使用 pytz.FixedOffset 创建时区 return pytz.FixedOffset(total_minutes)