import time import json from Model.models import AppAdvertiseCampaign, DeviceTypeModel, CountryModel, OpenScreenCampaign, UserSetStatus, \ Device_User, RegionRestriction from Ansjer.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, SERVER_TYPE, LOGGER from django.core.paginator import Paginator from django.views import View from django.db.models import Prefetch from django.db.models import Q, F from Object.AWS.AmazonS3Util import AmazonS3Util from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject class CampaignView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request_dict, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request_dict, request, operation) def validation(self, request_dict, request, operation): language = request_dict.get('language', 'en') response = ResponseObject(language, 'pc') if operation == 'getCountryList': return self.get_country_list(response) elif operation == 'getUserSetStatusList': # 获取用户设置广告状态列表 return self.get_user_set_status_list(request_dict, response) else: tko = TokenObject( request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') if tko.code != 0: return response.json(tko.code) response.lang = tko.lang if operation == 'getCampaignList': # 获取广告活动列表 return self.get_campaign_list(request_dict, response) elif operation == 'addCampaign': # 添加广告活动 return self.add_campaign(request, request_dict, response) elif operation == 'updateCampaign': # 更新广告活动 return self.update_campaign(request, request_dict, response) elif operation == 'deleteCampaign': # 删除广告活动 return self.delete_campaign(request_dict, response) elif operation == 'switchCampaign': # 广告活动开关 return self.switch_campaign(request_dict, response) elif operation == 'getUserBehaviorLog': # 获取用户行为日志 return self.get_user_behavior_log(request_dict, response) elif operation == 'setStatus': return self.set_status(request_dict, response) else: return response.json(414) def get_campaign_list(self, request_dict, response): """ 查询广告活动列表 @param request_dict: 请求参数 @param response: 响应对象 @return: 响应对象包含广告活动列表 """ campaign_name = request_dict.get('campaign_name', None) campaign_country = request_dict.get('campaign_country', None) status = request_dict.get('status', None) pageNo = request_dict.get('pageNo', 1) pageSize = request_dict.get('pageSize', 20) unknown_country = 0 try: # 连接并获取国家 country_prefetch = Prefetch('country', queryset=CountryModel.objects.only('country_name'), to_attr='country_list') app_advertise_campaign_qs = AppAdvertiseCampaign.objects.prefetch_related(country_prefetch) # 过滤 if campaign_name: app_advertise_campaign_qs = app_advertise_campaign_qs.filter(campaign_name=campaign_name) if status: app_advertise_campaign_qs = app_advertise_campaign_qs.filter(status=status) if campaign_country: campaign_country_list = campaign_country.split(',') if "未知地区" in campaign_country_list: unknown_country = 1 app_advertise_campaign_qs = app_advertise_campaign_qs.filter( Q(country__country_name__in=campaign_country_list) | Q(unknown_country=unknown_country)).distinct() app_advertise_campaign_qs = app_advertise_campaign_qs.filter(~Q(status=2)) # 分页 paginator = Paginator(app_advertise_campaign_qs.order_by('id'), pageSize) campaigns = paginator.page(pageNo) # 添加设备名和地区返回 campaign_list = [] for campaign in campaigns.object_list: if campaign.unknown_country == 0: countries = ",".join([country.country_name for country in campaign.country_list]) else: country_list = campaign.country_list country_names = [] for country in country_list: country_names.append(country.country_name) country_names.append("未知地区") countries = ",".join(country_names) s3_url, _, _ = self.s3_server() # 轮播图处理 banner_campaign_list = campaign.banner_campaign for banner_campaign in banner_campaign_list: banner_campaign["image"] = s3_url + banner_campaign["image"] banner_campaign_dict = {str(index): banner_campaign_list for index, banner_campaign_list in enumerate(banner_campaign_list, start=1)} campaign_data = { 'id': campaign.id, 'image_url': s3_url + campaign.image_url, 'pad_image_url': s3_url + campaign.pad_image_url, 'banner_campaign': banner_campaign_dict, 'campaign_name': campaign.campaign_name, 'campaign_url': campaign.campaign_url, 'campaign_type': campaign.campaign_type, 'status': campaign.status, 'campaign_start_date': campaign.campaign_start_date, 'campaign_end_date': campaign.campaign_end_date, 'campaign_show_stime': campaign.campaign_show_stime, 'campaign_show_etime': campaign.campaign_show_etime, 'app_bundle_type': campaign.app_bundle_type, 'countries': countries, 'device_types': campaign.device_type, 'ex_device_types': campaign.ex_device_type } if campaign.image_url == "": del campaign_data['image_url'] del campaign_data['campaign_url'] del campaign_data['pad_image_url'] campaign_list.append(campaign_data) data = { 'list': campaign_list, 'total': paginator.count, } return response.json(0, data) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def add_campaign(self, request, request_dict, response): """ 添加新的广告活动 @param request: 包含文件信息 @param request_dict: 包含所有请求参数的字典 @param response: 响应对象 @return: 响应对象 """ try: campaign_name = request_dict.get('campaign_name', None) campaign_url = request_dict.get('campaign_url', "") status = request_dict.get('status', 2) campaign_start_time = request_dict.get('campaign_start_time', None) campaign_end_time = request_dict.get('campaign_end_time', None) campaign_show_stime = request_dict.get('campaign_show_stime', 0) campaign_show_etime = request_dict.get('campaign_show_etime', 86399) app_bundle_type = request_dict.get('app_bundle_type', None) poster_file = request.FILES.get('posterFile', None) poster_pad_file = request.FILES.get('posterPadFile', None) banner_files = request.FILES.getlist('bannerFiles', None) banner_fields = json.loads(request_dict.get('banner_fields', "[]")) # 轮播字段 device_type_names = json.loads(request_dict.get('device_type_list', "[]")) # 设备类型名称列表 ex_device_type_names = json.loads(request_dict.get('ex_device_type_list', "[]")) country_name_list = json.loads(request_dict.get('country_name_list', "[]")) # 地区列表 required_fields = [campaign_name, campaign_start_time, campaign_end_time, app_bundle_type] list_fields = [country_name_list] # 检查基本字段是否为None if any(field is None for field in required_fields): return response.json(444) # 检查列表类型的字段是否为空 if any(not field for field in list_fields): return response.json(444) # 确保至少提供了一个文件 if not poster_file and not banner_files: return response.json(444) # 不包含设备和包含设备不能重复 if device_type_names and ex_device_type_names: intersection = list(set(device_type_names).intersection(set(ex_device_type_names))) if intersection: return response.json(10, "设备包含和设备不包含不能重复") # 针对特殊地区的处理,表没设计好用这个处理挽救一下 unknown_country = 0 if "未知地区" in country_name_list: unknown_country = 1 country_name_list.remove("未知地区") # 上传文件到S3 banner_image_urls = self.upload_files_to_s3(banner_files, "BannerAdvertise") if banner_files is not None else [] poster_image_url = self.upload_files_to_s3(poster_file, "OpenScreenAdvertise")[0] if poster_file is not None else "" pad_image_url = self.upload_files_to_s3(poster_file, "OpenScreenAdvertise")[0] if poster_pad_file is not None else "" campaign_type = [] if poster_image_url != "": campaign_type.append(1) if banner_image_urls: campaign_type.append(2) banner_campaign = [{"image": image_url, "url": cam_field["url"], "tag": int(cam_field["tag"])} for image_url, cam_field in zip(banner_image_urls, banner_fields)] else: banner_campaign = [] create_time = int(time.time()) update_time = int(time.time()) # 创建 AppAdvertiseCampaign 实例 new_campaign = AppAdvertiseCampaign.objects.create( image_url=poster_image_url, pad_image_url=pad_image_url, campaign_name=campaign_name, campaign_url=campaign_url, banner_campaign=banner_campaign, campaign_type=campaign_type, status=status, unknown_country=unknown_country, app_bundle_type=app_bundle_type, campaign_start_date=campaign_start_time, campaign_end_date=campaign_end_time, campaign_show_stime=campaign_show_stime, campaign_show_etime=campaign_show_etime, create_time=create_time, update_time=update_time, ex_device_type=ex_device_type_names, device_type=device_type_names ) # 根据 ID 关联 CountryModel 实例 country_instances = CountryModel.objects.filter(country_name__in=country_name_list) for country_instance in country_instances: new_campaign.country.add(country_instance) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def update_campaign(self, request, request_dict, response): """ 更新广告活动 @param request: 包含文件信息 @param request_dict: 包含所有请求参数的字典 @param response: 响应对象 @return: 响应对象 """ try: campaign_id = request_dict.get('id', None) campaign_name = request_dict.get('campaign_name', None) campaign_url = request_dict.get('campaign_url', None) campaign_start_time = request_dict.get('campaign_start_time', None) campaign_end_time = request_dict.get('campaign_end_time', None) campaign_show_stime = request_dict.get('campaign_show_stime', None) campaign_show_etime = request_dict.get('campaign_show_etime', None) app_bundle_type = request_dict.get('app_bundle_type', None) device_type_names = json.loads(request_dict.get('device_type_list', "[]")) # 设备类型名称列表 ex_device_type_names = json.loads(request_dict.get('ex_device_type_list', "[]")) country_name_list = json.loads(request_dict.get('country_name_list', "[]")) # 地区列表 poster_file = request.FILES.get('posterFile', None) poster_pad_file = request.FILES.get('posterPadFile', None) # 更改轮播图广告图片 sort_files = json.loads(request_dict.get('sort_files', "[]")) banner_files = request.FILES.getlist('bannerFiles', None) # 更改轮播图 字段 sort_urls = json.loads(request_dict.get('sort_url', "[]")) banner_fields = json.loads(request_dict.get('banner_fields', "[]")) # 删除轮播图广告 sort_banner_del = json.loads(request_dict.get('sort_banner_del', "[]")) poster_del = request_dict.get('poster_del', None) if not campaign_id: return response.json(444) update_time = int(time.time()) campaign = AppAdvertiseCampaign.objects.filter(pk=campaign_id).first() # 未知地区特殊处理 if country_name_list is not None: if "未知地区" in country_name_list: campaign.unknown_country = 1 country_name_list.remove("未知地区") else: campaign.unknown_country = 0 # 开屏广告图片 if poster_file is not None: self.del_file_to_s3(campaign.image_url) poster_image_url = self.upload_files_to_s3(poster_file, "OpenScreenAdvertise")[0] campaign.image_url = poster_image_url if 1 not in campaign.campaign_type: campaign.campaign_type.append(1) # 开屏海报pad图片 if poster_pad_file is not None: self.del_file_to_s3(campaign.pad_image_url) pad_image_url = self.upload_files_to_s3(poster_pad_file, "OpenScreenAdvertise")[0] campaign.pad_image_url = pad_image_url # 不包含设备和包含设备不能重复 if device_type_names and ex_device_type_names: intersection = list(set(device_type_names).intersection(set(ex_device_type_names))) if intersection: return response.json(10, "设备包含和设备不包含不能重复") # 更改轮播广告图片 if banner_files: for sort in sort_files: if sort <= len(campaign.banner_campaign): old_image = campaign.banner_campaign[sort - 1]["image"] self.del_file_to_s3(old_image) else: campaign.banner_campaign.append({}) if sort not in sort_urls: return response.json(10, "图片需要和链接对应") banner_image_urls = self.upload_files_to_s3(banner_files, "BannerAdvertise") image_num = 0 for sort in sort_files: campaign.banner_campaign[sort - 1]["image"] = banner_image_urls[image_num] image_num = image_num + 1 if 2 not in campaign.campaign_type: campaign.campaign_type.append(2) # 更改轮播广告链接 if banner_fields: url_num = 0 for sort in sort_urls: if sort > len(campaign.banner_campaign): campaign.banner_campaign.append({}) if sort not in sort_files: return response.json(10, "图片需要和链接对应") campaign.banner_campaign[sort - 1]["url"] = banner_fields[url_num]["url"] campaign.banner_campaign[sort - 1]["tag"] = banner_fields[url_num]["tag"] url_num = url_num + 1 # 添加删除轮播图列表 if sort_banner_del: sort_banner_del.sort(reverse=True) for sort in sort_banner_del: if sort <= len(campaign.banner_campaign): self.del_file_to_s3(campaign.banner_campaign[sort - 1]["image"]) campaign.banner_campaign.pop(sort - 1) if not campaign.banner_campaign: campaign.campaign_type.remove(2) # 删除开屏 if poster_del is not None: campaign.campaign_url = "" self.del_file_to_s3(campaign.image_url) self.del_file_to_s3(campaign.pad_image_url) campaign.image_url = "" campaign.pad_image_url = "" campaign.campaign_type.remove(1) if not campaign.campaign_type: campaign.status = 2 # 常规字段 if campaign_name is not None: campaign.campaign_name = campaign_name if campaign_url is not None: campaign.campaign_url = campaign_url if app_bundle_type is not None: campaign.app_bundle_type = app_bundle_type if campaign_start_time is not None: campaign.campaign_start_date = campaign_start_time if campaign_end_time is not None: campaign.campaign_end_date = campaign_end_time if campaign_show_stime is not None: campaign.campaign_show_stime = campaign_show_stime if campaign_show_etime is not None: campaign.campaign_show_etime = campaign_show_etime if campaign_show_etime is not None: campaign.campaign_show_etime = campaign_show_etime if device_type_names is not None: campaign.device_type = device_type_names if ex_device_type_names is not None: campaign.ex_device_type = ex_device_type_names # 更新多对多字段 - 国家/地区 if country_name_list: countries = CountryModel.objects.filter(country_name__in=country_name_list) campaign.country.set(countries) campaign.update_time = update_time # 保存更新 campaign.save() return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def delete_campaign(self, request_dict, response): """ 删除广告活动 @param request_dict: 包含所有请求参数的字典 @param response: 响应对象 @return: 响应对象 """ campaign_id = request_dict.get('id') if not campaign_id: return response.json(444) try: campaign = AppAdvertiseCampaign.objects.get(pk=campaign_id) if campaign.image_url != "": self.del_file_to_s3(campaign.image_url) self.del_file_to_s3(campaign.pad_image_url) if campaign.banner_campaign: banner_campaign_list = [] for banner_campaign in campaign.banner_campaign: banner = banner_campaign["image"] banner_campaign_list.append(banner) self.del_file_to_s3(banner_campaign_list) # 清除多对多关系 campaign.country.clear() # 保留在广告表中 campaign.status = 2 campaign.update_time = int(time.time()) campaign.save() return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def switch_campaign(self, request_dict, response): """ 广告活动 开启/关闭 @param request_dict: 包含所有请求参数的字典 @param response: 响应对象 @return: 响应对象 """ campaign_id = request_dict.get('id') status = request_dict.get('status') if not all([campaign_id, status]): return response.json(444) try: AppAdvertiseCampaign.objects.filter(pk=campaign_id).update(status=status, update_time=int(time.time())) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def get_user_behavior_log(self, request_dict, response): """ 记录日志 @param request_dict: dict @param response: @return: """ campaign_ids = json.loads(request_dict.get('campaign_id', '[]')) start_time = request_dict.get('start_time', None) end_time = request_dict.get('end_time', None) if not all([start_time, end_time]): return response.json(444) try: open_screen_campaign_qs = OpenScreenCampaign.objects.filter( update_time__range=[int(start_time), int(end_time)]) if campaign_ids: open_screen_campaign_qs = open_screen_campaign_qs.filter( campaign_id__in=campaign_ids, campaign_id__status__in=[0, 1], campaign_id__campaign_type__contains=[1] ) open_screen_campaign_qs = (open_screen_campaign_qs. select_related('campaign_id'). values('id', 'user_id', 'status', 'update_time', 'create_time', 'campaign_id', 'campaign_id__campaign_name', 'campaign_id__campaign_type', 'campaign_id__status', 'campaign_id__campaign_start_date', 'campaign_id__campaign_end_date', 'campaign_id__campaign_show_stime', 'campaign_id__campaign_show_etime')) if not open_screen_campaign_qs.exists(): return response.json(0, {'list': []}) campaigns_list = [] for campaign in open_screen_campaign_qs: renamed_campaign = { 'id': campaign['id'], 'user_id': campaign['user_id'], 'status': campaign['status'], 'update_time': campaign['update_time'], 'create_time': campaign['create_time'], 'campaign_id': campaign['campaign_id'], 'campaign_name': campaign['campaign_id__campaign_name'], 'campaign_type': 1, 'campaign_status': campaign['campaign_id__status'], 'start_date': campaign['campaign_id__campaign_start_date'], 'end_date': campaign['campaign_id__campaign_end_date'], 'start_time': campaign['campaign_id__campaign_show_stime'], 'end_time': campaign['campaign_id__campaign_show_etime'] } campaigns_list.append(renamed_campaign) return response.json(0, {'list': campaigns_list}) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def get_country_list(self, response): """ 获取国家列表 @param response: @return: """ try: if SERVER_TYPE == 'Ansjer.us_config.formal_settings': region_api = 'https://www.dvema.com/' elif SERVER_TYPE == 'Ansjer.eur_config.formal_settings': region_api = 'https://api.zositeche.com/' elif SERVER_TYPE == 'Ansjer.cn_config.formal_settings': region_api = 'https://www.zositechc.cn/' else: region_api = 'https://test.zositechc.cn/' country_qs = CountryModel.objects.filter(region__api=region_api).values('country_name') if not country_qs.exists(): return response.json(173) country_list = [] for country in country_qs: country_list.append(country['country_name']) return response.json(0, {'list': country_list}) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def upload_files_to_s3(files, path_prefix): """ 广告图片文件上传到S3存储桶 @param files: 文件 request.FILES获取 @param path_prefix: 存储路径 @return image_urls: 列表 """ try: _, regin, AWS_SES_ACCESS_REGION = CampaignView.s3_server() # 确保files是一个列表,统一处理单个文件和多个文件 if not isinstance(files, list): files = [files] image_urls = [] bucket_name = "ansjerfilemanager" s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[regin], AWS_SECRET_ACCESS_KEY[regin], AWS_SES_ACCESS_REGION) for file in files: timestamp = int(time.time()) file_key = f'app/campaign/{path_prefix}/{timestamp}_{file.name}' s3.upload_file_obj(bucket_name, file_key, file, {'ContentType': file.content_type, 'ACL': 'public-read'}) image_urls.append(f"{path_prefix}/{timestamp}_{file.name}") return image_urls except Exception as e: LOGGER.info('存储桶添加异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return None @staticmethod def del_file_to_s3(path_list): """ S3存储桶 删除文件 @param path_list: 单一字符串或者列表 @return """ try: _, regin, AWS_SES_ACCESS_REGION = CampaignView.s3_server() if not isinstance(path_list, list): path_list = [path_list] for path in path_list: # 删除存储桶原来的图片 s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[regin], AWS_SECRET_ACCESS_KEY[regin], AWS_SES_ACCESS_REGION) bucket_name = 'ansjerfilemanager' s3.delete_obj(bucket_name, f"app/campaign/{path}") except Exception as e: LOGGER.info('存储桶删除异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def s3_server(): if SERVER_TYPE == 'Ansjer.cn_config.formal_settings' or SERVER_TYPE == 'Ansjer.cn_config.test_settings': s3_url = "https://ansjerfilemanager.s3.cn-northwest-1.amazonaws.com.cn/app/campaign/" regin = 0 AWS_SES_ACCESS_REGION = "cn-northwest-1" else: s3_url = "https://ansjerfilemanager.s3.amazonaws.com/app/campaign/" regin = 1 AWS_SES_ACCESS_REGION = 'us-east-1' return s3_url, regin, AWS_SES_ACCESS_REGION @staticmethod def set_status(request_dict, response): """ 编辑状态 """ try: user_status_id = request_dict.get("userSetId", None) username = request_dict.get("username", None) status = request_dict.get("status", None) now_time = int(time.time()) if user_status_id and status: UserSetStatus.objects.filter(pk=user_status_id).update(status=status) elif username: region_restriction = RegionRestriction.objects.filter(statusName="splashAdSwitchStatus").first() status = region_restriction.default_status if region_restriction else None user = Device_User.objects.filter( Q(userID=username) | Q(userEmail=username) | Q(phone=username) | Q(username=username)).first() if user: if UserSetStatus.objects.filter(user_id=user.userID, region_restriction_id=region_restriction).exists(): return response.json(174) UserSetStatus.objects.create(user_id=user.userID, status=status, region_restriction_id=region_restriction, created_time=now_time, updated_time=now_time) else: return response.json(173) else: return response.json(444) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_user_set_status_list(request_dict, response): """ 获取状态列表 """ try: user_id = request_dict.get("userID", None) email = request_dict.get("email", None) phone = request_dict.get("phone", None) username = request_dict.get("username", None) page = int(request_dict.get("page", 1)) page_size = int(request_dict.get("pageSize", 10)) region_restriction = RegionRestriction.objects.filter(statusName="splashAdSwitchStatus").first() user_set_qs = UserSetStatus.objects.filter(region_restriction_id=region_restriction) if user_id: user_set_qs = user_set_qs.filter(user_id=user_id) if email: user = Device_User.objects.filter(userEmail=email).first() if user: user_set_qs = user_set_qs.filter(user_id=user.userID) if phone: user = Device_User.objects.filter(phone=phone).first() if user: user_set_qs = user_set_qs.filter(user_id=user.userID) if username: user = Device_User.objects.filter(username=username).first() if user: user_set_qs = user_set_qs.filter(user_id=user.userID) if not user_set_qs.exists(): return response.json(0, {'list': [], 'total': 0}) total_count = user_set_qs.count() user_set_paginated = user_set_qs[(page - 1) * page_size:page * page_size] user_set_list = [] for user_set in user_set_paginated: status_name = user_set.region_restriction_id.statusName user = Device_User.objects.filter(userID=user_set.user_id).first() user_set_list.append({ 'userSetId': user_set.id, 'userID': user_set.user_id, 'statusName': status_name, 'setStatus': user_set.status, 'username': user.username, 'phone': user.phone, 'email': user.userEmail, 'createdTime': user_set.created_time, 'updatedTime': user_set.updated_time, }) return response.json(0, { 'list': user_set_list, 'total': total_count, }) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))