import time import pytz import json from datetime import datetime from Model.models import AppAdvertiseCampaign, DeviceTypeModel, CountryModel from Ansjer.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, \ AWS_SES_ACCESS_REGION, UNUSED_SERIAL_REDIS_LIST from django.core.paginator import Paginator from django.views import View from django.db.models import Prefetch from django.db.models import Q from Object.AWS.AmazonS3Util import AmazonS3Util from Object.ResponseObject import ResponseObject class AppCampaignView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request_dict, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request_dict, request, operation) def validation(self, request_dict, request, operation): language = request_dict.get('language', 'en') response = ResponseObject(language, 'pc') if operation == 'getCampaignList': # 获取广告活动列表 return self.get_campaign_list(request_dict, response) elif operation == 'addCampaign': # 添加广告活动 return self.add_campaign(request, request_dict, response) elif operation == 'updateCampaign': # 更新广告活动 return self.update_campaign(request, request_dict, response) elif operation == 'deleteCampaign': # 删除广告活动 return self.delete_campaign(request_dict, response) elif operation == 'switchCampaign': # 广告活动开关 return self.switch_campaign(request_dict, response) elif operation == 'recordUserBehavior': return self.record_user_behavior(request_dict, response) elif operation == 'appGetCampaigns': return self.app_get_campaigns(request_dict, response) else: return response.json(404) def get_campaign_list(self, request_dict, response): """ 查询广告活动列表 @param request_dict: 请求参数 @param response: 响应对象 @return: 响应对象包含广告活动列表 """ campaign_name = request_dict.get('campaign_name', None) campaign_country = request_dict.get('campaign_country', None) status = request_dict.get('status', None) pageNo = request_dict.get('pageNo', 1) pageSize = request_dict.get('pageSize', 20) unknown_country = 0 # 连接并获取国家和设备类型 country_prefetch = Prefetch('country', queryset=CountryModel.objects.only('country_name'), to_attr='country_list') device_type_prefetch = Prefetch('device_type', queryset=DeviceTypeModel.objects.only('name'), to_attr='device_type_list') app_advertise_campaign_qs = AppAdvertiseCampaign.objects.prefetch_related(country_prefetch, device_type_prefetch) # 过滤 if campaign_name: app_advertise_campaign_qs = app_advertise_campaign_qs.filter(campaign_name=campaign_name) if status: app_advertise_campaign_qs = app_advertise_campaign_qs.filter(status=status) if campaign_country: campaign_country_list = campaign_country.split(',') if "未知地区" in campaign_country_list: unknown_country = 1 app_advertise_campaign_qs = app_advertise_campaign_qs.filter(Q(country__country_name__in=campaign_country_list) | Q(unknown_country=unknown_country)).distinct() # 分页 paginator = Paginator(app_advertise_campaign_qs.order_by('id'), pageSize) campaigns = paginator.page(pageNo) # Construct response data with country names and device types campaign_list = [] for campaign in campaigns.object_list: if campaign.unknown_country == 0: countries = ",".join([country.country_name for country in campaign.country_list]) else: country_list = campaign.country_list country_names = [] for country in country_list: country_names.append(country.country_name) country_names.append("未知地区") countries = ",".join(country_names) campaign_data = { 'id': campaign.id, 'image_url': 'https://ansjerfilemanager.s3.amazonaws.com/app/campaign/' + campaign.image_url, 'campaign_name': campaign.campaign_name, 'campaign_url': campaign.campaign_url, 'campaign_type': campaign.campaign_type, 'status': campaign.status, 'campaign_start_date': campaign.campaign_start_date, 'campaign_end_date': campaign.campaign_end_date, 'campaign_show_stime': campaign.campaign_show_stime, 'campaign_show_etime': campaign.campaign_show_etime, 'countries': countries, 'device_types': ",".join([device.name for device in campaign.device_type_list]), } campaign_list.append(campaign_data) data = { 'list': campaign_list, 'total': paginator.count, } return response.json(0, data) def add_campaign(self, request, request_dict, response): """ 添加新的广告活动 @param request_dict: 包含所有请求参数的字典 @param response: 响应对象 @return: 响应对象 """ file = request.FILES.get('posterFile', None) campaign_name = request_dict.get('campaign_name', None) campaign_url = request_dict.get('campaign_url', None) campaign_type = request_dict.get('campaign_type', None) status = request_dict.get('status', None) device_type_names = json.loads(request_dict.get('device_type_list', None)) # 设备类型名称列表 country_name_list = json.loads(request_dict.get('country_name_list', None)) # 地区列表 campaign_start_time = request_dict.get('campaign_start_time', None) campaign_end_time = request_dict.get('campaign_end_time', None) campaign_show_stime = request_dict.get('campaign_show_stime', 0) campaign_show_etime = request_dict.get('campaign_show_etime', 86399) if not all([campaign_name, campaign_type, device_type_names, campaign_url, country_name_list, campaign_start_time, campaign_end_time, file]): return response.json(444) # 针对特殊地区的处理,表没设计好用这个处理挽救一下 unknown_country = 0 if "未知地区" in country_name_list: unknown_country = 1 country_name_list.remove("未知地区") fileName = file.name try: create_time = int(time.time()) update_time = int(time.time()) # 保存图片到存储桶 bucket_name = 'ansjerfilemanager' file_key = f'app/campaign/OpenScreenAdvertise/{update_time}_{fileName}' s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[1], AWS_SECRET_ACCESS_KEY[1], AWS_SES_ACCESS_REGION) # 地址:https://ansjerfilemanager.s3.amazonaws.com/app/campaign/OpenScreenAdvertise/XXX.jpg s3.upload_file_obj( bucket_name, file_key, file, {'ContentType': file.content_type, 'ACL': 'public-read'}) # 创建 AppAdvertiseCampaign 实例 new_campaign = AppAdvertiseCampaign.objects.create( image_url=f"OpenScreenAdvertise/{update_time}_{fileName}", campaign_name=campaign_name, campaign_url=campaign_url, campaign_type=campaign_type, status=status, unknown_country=unknown_country, campaign_start_date=campaign_start_time, campaign_end_date=campaign_end_time, campaign_show_stime=campaign_show_stime, campaign_show_etime=campaign_show_etime, create_time=create_time, update_time=update_time, ) # 根据名称查找 DeviceTypeModel 的实例并建立关系 device_type_instances = DeviceTypeModel.objects.filter(name__in=device_type_names) for device_type_instance in device_type_instances: new_campaign.device_type.add(device_type_instance) # 根据 ID 关联 CountryModel 实例 country_instances = CountryModel.objects.filter(country_name__in=country_name_list) for country_instance in country_instances: new_campaign.country.add(country_instance) except Exception as e: return response.json(174) return response.json(0) def update_campaign(self, request, request_dict, response): campaign_id = request_dict.get('id', None) file = request.FILES.get('posterFile', None) campaign_name = request_dict.get('campaign_name', None) campaign_url = request_dict.get('campaign_url', None) campaign_type = request_dict.get('campaign_type', None) device_type_names = json.loads(request_dict.get('device_type_list', None)) # 设备类型名称列表 country_name_list = json.loads(request_dict.get('country_name_list', None)) # 地区列表 campaign_start_time = request_dict.get('campaign_start_time', None) campaign_end_time = request_dict.get('campaign_end_time', None) campaign_show_stime = request_dict.get('campaign_show_stime', None) campaign_show_etime = request_dict.get('campaign_show_etime', None) if not campaign_id: return response.json(444) try: update_time = int(time.time()) campaign = AppAdvertiseCampaign.objects.get(id=campaign_id) if country_name_list is not None: if "未知地区" in country_name_list: campaign.unknown_country = 1 country_name_list.remove("未知地区") else: campaign.unknown_country = 0 if file is not None: # 删除存储桶原来的图片 s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[1], AWS_SECRET_ACCESS_KEY[1], AWS_SES_ACCESS_REGION) bucket_name = 'ansjerfilemanager' if campaign.image_url: old_file_key = campaign.image_url.split('/')[-1] s3.delete_obj(bucket_name, old_file_key) # 添加新的图片 file_key = f'app/campaign/OpenScreenAdvertise/{update_time}_{file.name}' s3.upload_file_obj( bucket_name, file_key, file, {'ContentType': file.content_type, 'ACL': 'public-read'} ) campaign.image_url = f'OpenScreenAdvertise/{update_time}_{file.name}' if campaign_name is not None: campaign.campaign_name = campaign_name if campaign_url is not None: campaign.campaign_url = campaign_url if campaign_type is not None: campaign.campaign_type = campaign_type if campaign_start_time is not None: campaign.campaign_start_date = campaign_start_time if campaign_end_time is not None: campaign.campaign_end_date = campaign_end_time if campaign_show_stime is not None: campaign.campaign_show_stime = campaign_show_stime if campaign_show_etime is not None: campaign.campaign_show_etime = campaign_show_etime # 更新多对多字段 - 设备类型 if device_type_names: device_types = DeviceTypeModel.objects.filter(name__in=device_type_names) campaign.device_type.set(device_types) # 更新多对多字段 - 国家/地区 if country_name_list: countries = CountryModel.objects.filter(country_name__in=country_name_list) campaign.country.set(countries) campaign.update_time = update_time # 保存更新 campaign.save() except Exception as e: return response.json(177) return response.json(0) def switch_campaign(self, request_dict, response): campaign_id = request_dict.get('id') status = request_dict.get('status') if not campaign_id: return response.json(444) try: campaign = AppAdvertiseCampaign.objects.get(pk=campaign_id) # 更新字段 campaign.status = status campaign.update_time = int(time.time()) # 保存更改 campaign.save() return response.json(0) except Exception as e: return response.json(444) def delete_campaign(self, request_dict, response): campaign_id = request_dict.get('id') if not campaign_id: return response.json(444) try: campaign = AppAdvertiseCampaign.objects.get(id=campaign_id) campaign.device_type.clear() campaign.country.clear() campaign.delete() return response.json(0) except Exception as e: return response.json(444) def app_get_campaigns(self, request_dict, response): """ APP获取广告活动列表 @param request_dict: 请求参数 @param response: 响应对象 @return: 响应对象 """ tz = request_dict.get('时区','tz') country_prefetch = Prefetch('country', queryset=CountryModel.objects.only('country_name'), to_attr='country_list') device_type_prefetch = Prefetch('device_type', queryset=DeviceTypeModel.objects.only('name'), to_attr='device_type_list') app_advertise_campaign_qs = AppAdvertiseCampaign.objects.prefetch_related(country_prefetch, device_type_prefetch) app_advertise_campaign_qs = (app_advertise_campaign_qs.objects.filter(status=1,) .values('image_url', 'campaign_name', 'campaign_type', 'campaign_start_time', 'campaign_end_time', 'campaign_show_stime', 'campaign_show_etime', )) #返回 广告名称、广告类型、开始时间、结束时间、广告图片、活动链接 campaigns_list = [] # 调整时区返回时间 for campaign in app_advertise_campaign_qs: # 将时间戳转换为UTC的datetime对象 start_utc_time = datetime.utcfromtimestamp(int(campaign['campaign_start_time'])).replace(tzinfo=pytz.utc) end_utc_time = datetime.utcfromtimestamp(int(campaign['campaign_end_time'])).replace(tzinfo=pytz.utc) campaigns_list.append(campaign) return response.json(0,{ 'campaigns': campaigns_list }) def record_user_behavior(self, request_dict, response): """ 记录用户行为 @param request_dict: 请求参数 @param response: 响应对象 @return: 响应对象 """ pass