import json import logging import time from django.db import transaction from django.views.generic.base import View from Ansjer.config import SERVER_TYPE from Model.models import AppSetModel, PromotionRuleModel, PopupsConfig, RedDotsConfig, Device_Info, UidSetModel, \ UserOperationLog, Order_Model, IPAddr, RegionRestriction, UserSetStatus from Object.IPWeatherObject import IPQuery from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Object.utils import LocalDateTimeUtil from Service.CommonService import CommonService from Service.ModelService import ModelService LOGGER = logging.getLogger('info') class AppSetView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation', None) return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation', None) return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'query': return self.do_query(request_dict, response) token = request_dict.get('token', None) tko = TokenObject(token) if tko.code != 0: return response.json(tko.code) user_id = tko.userID if operation == 'page_set': # app弹窗标记红点设置 return self.do_page_set(user_id, request_dict, response) elif operation == 'ai-preview': return self.save_user_popups_log(user_id, request_dict, response) elif operation == 'admin_query': return self.do_admin_query(user_id, request_dict, response) elif operation == 'admin_update': return self.do_admin_update(user_id, request_dict, response) elif operation == 'statusByIp': return self.status_by_ip(user_id, request, response) elif operation == 'userSetAdStatus': return self.user_set_ad_status(user_id, request_dict, response) else: return response.json(414) @staticmethod def do_query(request_dict, response): """ 查询app配置 @param request_dict: 请求数据 @request_dict lang: 语言 @request_dict appBundleId: app包id @param response: 响应 @return: response """ lang = request_dict.get('lang', None) appBundleId = request_dict.get('appBundleId', None) if not appBundleId: return response.json(444, 'appBundleId') app_set_qs = AppSetModel.objects.filter(appBundleId=appBundleId).values('content') if not app_set_qs.exists(): return response.json(173) try: if not app_set_qs[0]['content']: return response.json(0) dict_json = json.loads(app_set_qs[0]['content']) # 加入促销弹窗 promotion = PromotionRuleModel.objects.filter(status=1).values('startTime', 'endTime', 'popups') if promotion.exists(): dict_json['popupsStartTime'] = promotion[0]['startTime'] dict_json['popupsEndTime'] = promotion[0]['endTime'] dict_json['popupsContent'] = json.loads(promotion[0]['popups']).get(lang, '') dict_json['nowTime'] = int(time.time()) if 'editionUpgrading' in dict_json: dict_json['editionUpgrading'] = '' if dict_json['editionUpgrading'] == 1: if lang == 'cn': dict_json['editionUpgrading'] = '正在升级,请稍后登录' else: dict_json['editionUpgrading'] = 'Upgrading, please sign in later' return response.json(0, dict_json) except Exception as e: return response.json(500, '错误行数:{errLine}, 错误信息: {errmsg}'.format(errLine=e.__traceback__.tb_lineno, errmsg=repr(e))) def do_admin_query(self, userID, request_dict, response): # 查询和添加权限 own_perm = ModelService.check_perm(userID, 40) if not own_perm: return response.json(404) appBundleId = request_dict.get('appBundleId', None) sm_qs = AppSetModel.objects.filter(appBundleId=appBundleId) count = sm_qs.count() nowTime = int(time.time()) if count > 0: sm_qs = sm_qs.values('id', 'appBundleId', 'content', 'addTime', 'updTime') return response.json(0, {'data': list(sm_qs), 'count': count}) else: AppSetModel.objects.create( appBundleId=appBundleId, addTime=nowTime, updTime=nowTime ) return response.json(0) def do_admin_update(self, userID, request_dict, response): # 修改的权限 own_perm = ModelService.check_perm(userID, 50) if not own_perm: return response.json(404) appBundleId = request_dict.get('appBundleId', None) content = request_dict.get('content', None) nowTime = int(time.time()) sm_qs = AppSetModel.objects.filter(appBundleId=appBundleId) redis = RedisObject() if SERVER_TYPE != "Ansjer.formal_settings": key_id = "www" + appBundleId else: key_id = "test" + appBundleId redis.del_data(key=key_id) if sm_qs.exists(): sm_qs.update(content=content, updTime=nowTime) return response.json(0) else: return response.json(173) @staticmethod def do_page_set(userID, request_dict, response): """ 初始化加载红点以及弹窗数据 """ try: lang = request_dict.get('lang', 'en') dict_json = {} now_time = int(time.time()) dict_json['popups'] = { 'title': '', 'content': '', 'status': 0, 'tag': 1, } with transaction.atomic(): # AI弹窗 dict_json['aiPopups'] = AppSetView.get_ai_init_data(userID, lang) # 弹窗 popups_obj = PopupsConfig.objects.filter(lang=lang).values('title', 'content', 'start_time', 'end_time', 'tag') if popups_obj.exists(): popups_status = 0 if popups_obj[0]['start_time'] <= now_time <= popups_obj[0]['end_time']: popups_status = 1 dict_json['popups'] = { 'title': popups_obj[0]['title'], 'content': popups_obj[0]['content'], 'status': popups_status, 'tag': popups_obj[0]['tag'], } # 红点标记 dict_json['red_dots'] = [] red_dots_obj = RedDotsConfig.objects.values('module', 'start_time', 'end_time') is_show_red_dots = AppSetView.check_user_is_show_red_dot(userID) # 是否显示红点 for red_dots in red_dots_obj: red_dots_status = 0 if red_dots['start_time'] <= now_time <= red_dots['end_time']: red_dots_status = 1 ai_detection = red_dots['module'] if ai_detection == 'ai_detects_purchases': red_dots_status = 1 if is_show_red_dots else 0 dict_json['red_dots'].append({ 'module': red_dots['module'], 'status': red_dots_status, }) dict_json['red_dots'] = list(dict_json['red_dots']) return response.json(0, dict_json) except Exception as e: LOGGER.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500) @staticmethod def get_ai_init_data(user_id, lang): """ 初始化获取AI弹窗数据 @param user_id: 用户id @param lang: 语言 @return: popups_qs """ now_time = int(time.time()) popups_qs = PopupsConfig.objects.filter(tag=2, lang=lang) \ .values('title', 'content', 'start_time', 'end_time', 'tag') if not popups_qs.exists(): return {} ai_device = AppSetView.get_user_ai_device(user_id) if not ai_device: return {} # 当前时间小于弹窗开始时间或者大于结束时间 则返回空字符串 if not popups_qs[0]['start_time'] <= now_time <= popups_qs[0]['end_time']: return {} user_ai_log_qs = UserOperationLog.objects.filter(user_id=user_id, type=2).values('created_time') user_log = {'user_id': user_id, 'status': 1, 'type': 2, 'created_time': now_time, 'updated_time': now_time} popups_status = 0 # 用户有AI设备 没有操作过弹窗则显示 if not user_ai_log_qs.exists(): popups_status = 1 UserOperationLog.objects.create(**user_log) else: now_date = int(LocalDateTimeUtil.time_stamp_to_time(now_time, '%Y%m%d')) created_date = int(LocalDateTimeUtil.time_stamp_to_time(user_ai_log_qs[0]['created_time'], '%Y%m%d')) if user_ai_log_qs.count() == 1 and now_date > created_date: popups_status = 1 UserOperationLog.objects.create(**user_log) return { 'title': popups_qs[0]['title'], 'content': popups_qs[0]['content'], 'status': popups_status, 'tag': popups_qs[0]['tag'], } @staticmethod def get_user_ai_device(user_id): """ 获取用户设备是否有有支持AI功能 @param user_id: 用户ID @return: True|False """ device_info = Device_Info.objects.filter(userID_id=user_id, isExist=1).values('UID') if not device_info.exists(): return False uid_list = [] for item in device_info: uid_list.append(item['UID']) uid_info_qs = UidSetModel.objects.filter(uid__in=uid_list).values('is_ai') if not uid_info_qs.exists(): return False if 1 or 0 in uid_info_qs: return True return False @staticmethod def check_user_is_show_red_dot(user_id): """ 获取用户是否显示红点 用户体验过AI免费套餐不显示 OR 用户操作记录阅读过AI介绍界面不显示 @param user_id: 用户ID @return: True | False """ order_qs = Order_Model.objects.filter(userID_id=user_id, order_type=2, status=1, payType=10) ai_red_dot_qs = UserOperationLog.objects.filter(user_id=user_id, type=4) return not ai_red_dot_qs.exists() and not order_qs.exists() @classmethod def save_user_popups_log(cls, user_id, request_dict, response): """ 保存用户预览AI介绍页面记录 @param request_dict: type @param user_id: 用户id @param response: 响应对象 """ try: rq_type = request_dict.get('type', 0) now_time = int(time.time()) user_log = {'user_id': user_id, 'status': 1, 'type': int(rq_type), 'created_time': now_time, 'updated_time': now_time} UserOperationLog.objects.create(**user_log) except Exception as e: LOGGER.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500) return response.json(0) @staticmethod def status_by_ip(user_id, request, response): """ 根据ip解析或用户设置返回状态 """ try: ip = CommonService.get_ip_address(request) ip_addr_qs = IPAddr.objects.filter(ip=ip, is_geoip2=False).values('country_code', 'city', 'region', 'district') if ip_addr_qs.exists(): ip_info = ip_addr_qs[0] else: ip_qs = IPQuery(ip) ip_info = { 'country_code': ip_qs.country_id, 'region': ip_qs.region, 'city': ip_qs.city, 'district': ip_qs.district } country_code = ip_info['country_code'] region = ip_info['region'] city = ip_info['city'] district = ip_info['district'] restrictions_qs = RegionRestriction.objects.all() user_set_status_qs = UserSetStatus.objects.filter(user_id=user_id) response_data = {} for restriction in restrictions_qs: response_data[restriction.statusName] = 0 user_set = user_set_status_qs.filter(region_restriction_id=restriction.id).first() # 用户控制 if user_set: response_data[restriction.statusName] = user_set.status # 地区控制 elif ((not restriction.country_code or country_code in restriction.country_code) and (not restriction.region or region in restriction.region) and (not restriction.city or city in restriction.city) and (not restriction.district or district in restriction.district)): # 返回值定义具体看表的content字段 response_data[restriction.statusName] = 1 LOGGER.info(f"请求ip地址为 {ip}, 解析为 {country_code}, {region}, {city}, {district}") return response.json(0, response_data) except Exception as e: LOGGER.info('根据ip解析地址返回状态异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500) @staticmethod def user_set_ad_status(user_id, request_dict, response): """ 用户设置广告状态 """ try: splash_ad_status = request_dict.get("splashAdStatus") user_set_status_qs = UserSetStatus.objects.filter(user_id=user_id) if not user_set_status_qs.exists(): UserSetStatus.objects.create(user_id=user_id, status=splash_ad_status) else: user_set_status_qs.update(status=splash_ad_status) return response.json(0) except Exception as e: LOGGER.info('设置用户广告状态异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500)