# -*- encoding: utf-8 -*- """ @File : AlgorithmShopManageController.py @Time : 2023/7/25 9:48 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import json import time from decimal import Decimal from django.core.paginator import Paginator from django.views import View from Model.models import DeviceAlgorithmExplain, DeviceAlgorithmType from Object.AWS.AmazonS3Util import AmazonS3Util from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Ansjer.config import CONFIG_INFO, CONFIG_EUR, CONFIG_US, CONFIG_TEST, CONFIG_CN, AWS_ACCESS_KEY_ID, \ AWS_SECRET_ACCESS_KEY, AWS_SES_ACCESS_REGION class AlgorithmShopManageView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') api_version = kwargs.get('apiVersion') return self.validation(request.GET, request, operation, api_version) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') api_version = kwargs.get('apiVersion') return self.validation(request.POST, request, operation, api_version) def validation(self, request_dict, request, operation, api_version='v1'): token = TokenObject(request.META.get('HTTP_AUTHORIZATION')) response = ResponseObject() if token.code != 0: return response.json(token.code) ''' 后台管理''' response = ResponseObject(returntype='pc') if operation == 'update': return self.algorithm_update(request_dict, response, api_version) elif operation == 'query': return self.algorithm_query(request_dict, response) elif operation == 'save': return self.algorithm_save(request, request_dict, response) elif operation == 'delete': return self.algorithm_delete(request_dict, response) elif operation == 'edit': return self.algorithm_edit(request, request_dict, response) else: return response.json(404) @classmethod def algorithm_update(cls, request_dict, response, api_version): try: a_id = request_dict.get('aId', None) lang = request_dict.get('lang', None) title = request_dict.get('title', None) subtitle = request_dict.get('subtitle', None) introduction = request_dict.get('introduction', None) install_explain = request_dict.get('installExplain', None) concerning = request_dict.get('concerning', None) risk_warning = request_dict.get('riskWarning', None) if not all([a_id, lang]): return response.json() a_explain_qs = DeviceAlgorithmExplain.objects.filter(algorithm_type_id=int(a_id), lang=lang) if not a_explain_qs.exists(): return response.json(173) data = {} if title: data['title'] = title if subtitle: data['subtitle'] = subtitle if introduction: data['introduction'] = introduction if install_explain: data['install_explain'] = install_explain if concerning: data['concerning'] = concerning if risk_warning: data['risk_warning'] = risk_warning a_explain_qs.update(**data) return response.json(0) except Exception as e: print(repr(e)) return response.json(500) @classmethod def algorithm_query(cls, request_dict, response): algorithm_type = request_dict.get('algorithmType', None) tag = request_dict.get('tag', None) status = request_dict.get('status', None) page = request_dict.get('page', 1) page_size = request_dict.get('pageSize', 10) try: device_algorithm_type_qs = DeviceAlgorithmType.objects.all() if algorithm_type: device_algorithm_type_qs = device_algorithm_type_qs.filter(type=algorithm_type) if tag: device_algorithm_type_qs = device_algorithm_type_qs.filter(tag=tag) if status: device_algorithm_type_qs = device_algorithm_type_qs.filter(status=status) # 分页 paginator = Paginator(device_algorithm_type_qs.order_by("-sort"), page_size) # 每页显示 page_size 条 device_algorithm_type_page = paginator.get_page(page) # 获取当前页的数据 device_algorithm_type_list = [] for device_algorithm_type in device_algorithm_type_page: device_algorithm_explain_qs = DeviceAlgorithmExplain.objects.filter( algorithm_type_id=device_algorithm_type.id) device_algorithm_explain_list = [] for device_algorithm_explain in device_algorithm_explain_qs: device_algorithm_explain_list.append( { 'algorithmExplainId': device_algorithm_explain.id, 'lang': device_algorithm_explain.lang, 'title': device_algorithm_explain.title, 'subtitle': device_algorithm_explain.subtitle, 'price': device_algorithm_explain.price, 'introduction': device_algorithm_explain.introduction, 'installExplain': device_algorithm_explain.install_explain, 'concerning': device_algorithm_explain.concerning, 'riskWarning': device_algorithm_explain.risk_warning, } ) device_algorithm_explain_qs = DeviceAlgorithmExplain.objects.filter( algorithm_type_id=device_algorithm_type.id) translation_num = device_algorithm_explain_qs.count() device_algorithm_explain = device_algorithm_explain_qs.filter(lang='cn').values("title") if device_algorithm_explain.exists(): title = device_algorithm_explain[0]["title"] else: device_algorithm_explain = device_algorithm_explain_qs.values("title") if device_algorithm_explain.exists(): title = device_algorithm_explain[0]["title"] else: title = "" device_algorithm_type_list.append({ "algorithmId": device_algorithm_type.id, "title": title, "algorithmType": device_algorithm_type.type, "memory": device_algorithm_type.memory, "downCount": device_algorithm_type.down_count, "tag": device_algorithm_type.tag, "status": device_algorithm_type.status, "expireTime": device_algorithm_type.expire_time if device_algorithm_type.status == 1 else 0, "sort": device_algorithm_type.sort, "basicFunction": device_algorithm_type.basic_function, "imageUrl": device_algorithm_type.image_url, "detailsImgUrl": device_algorithm_type.details_img_url, "iconUrl": device_algorithm_type.icon_url, "resource": device_algorithm_type.resource, "deviceAlgorithmExplain": device_algorithm_explain_list, "translationNum": translation_num }) data = { 'list': device_algorithm_type_list, 'total': paginator.count, } return response.json(0, data) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def algorithm_save(cls, request, request_dict, response): algorithm_type = request_dict.get('algorithmType', None) memory = request_dict.get('memory', "") tag = request_dict.get('tag', 0) status = request_dict.get('status', 0) expire_time = request_dict.get('expireTime', 0) sort = request_dict.get('sort', 0) basic_function = request_dict.get('basicFunction', '') resource = request_dict.get('resource', None) lang_configs = request_dict.get('langExplainConfig', None) image_file = request.FILES.get('imageFile', None) details_img_file = request.FILES.get('detailsImgFile', None) icon_file = request.FILES.get('iconFile', None) if not algorithm_type: return response.json(444) try: if lang_configs: lang_configs = json.loads(lang_configs) else: lang_configs = [] device_algorithm_type_qs = DeviceAlgorithmType.objects.filter(type=algorithm_type) if device_algorithm_type_qs.exists(): return response.json(174) # 上传图片至存储桶 icon_url = cls.upload_image("app/algorithm-shop/icon/", algorithm_type, icon_file) image_url = cls.upload_image("app/algorithm-shop/image/", algorithm_type, image_file) details_img_url = cls.upload_image("app/algorithm-shop/details-img/", algorithm_type, details_img_file) # 激活时间 expire_time = expire_time if status == 1 else 0 # 创建算法类型 device_algorithm_type = DeviceAlgorithmType.objects.create(type=algorithm_type, memory=memory, tag=tag, status=status, expire_time=expire_time, sort=sort, basic_function=basic_function, image_url=image_url, details_img_url=details_img_url, icon_url=icon_url, resource=resource, created_time=int(time.time())) # 处理算法翻译数据 explain_instances = [] for lang_config in lang_configs: explain_instance = DeviceAlgorithmExplain( algorithm_type_id=int(device_algorithm_type.id), # 关联的 DeviceAlgorithmType 的 id title=lang_config['title'], subtitle=lang_config['subtitle'], price=Decimal(lang_config['price'] if lang_config['price'] else 0), introduction=lang_config['introduction'], install_explain=lang_config['installExplain'], concerning=lang_config['concerning'], risk_warning=lang_config['riskWarning'], lang=lang_config['lang'], updated_time=int(time.time()), created_time=int(time.time()) ) explain_instances.append(explain_instance) DeviceAlgorithmExplain.objects.bulk_create(explain_instances) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def algorithm_delete(cls, request_dict, response): algorithm_id = request_dict.get('algorithmId', None) if not algorithm_id: return response.json(444) DeviceAlgorithmExplain.objects.filter(algorithm_type_id=algorithm_id).delete() DeviceAlgorithmType.objects.filter(id=algorithm_id).delete() return response.json(0) @classmethod def upload_image(cls, directory, algorithm_type, image_file): if not image_file: return "" bucket = 'ansjerfilemanager' file_key = directory + f"{CONFIG_INFO}_{algorithm_type}.png" if CONFIG_INFO == CONFIG_TEST or CONFIG_INFO == CONFIG_CN or CONFIG_INFO == 'local': prefix_url = "https://ansjerfilemanager.s3.cn-northwest-1.amazonaws.com.cn/" s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[0], AWS_SECRET_ACCESS_KEY[0], 'cn-northwest-1') else: prefix_url = "https://ansjerfilemanager.s3.amazonaws.com/" s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[1], AWS_SECRET_ACCESS_KEY[1], AWS_SES_ACCESS_REGION) s3.upload_file_obj( bucket, file_key, image_file, {'ContentType': image_file.content_type, 'ACL': 'public-read'}) return prefix_url + file_key @classmethod def algorithm_edit(cls, request, request_dict, response): algorithm_id = request_dict.get('algorithmId', None) memory = request_dict.get('memory', '') tag = request_dict.get('tag', 0) status = request_dict.get('status', 0) expire_time = request_dict.get('expireTime', 0) sort = request_dict.get('sort', 0) basic_function = request_dict.get('basicFunctions', '') resource = request_dict.get('resource', None) lang_configs = request_dict.get('langExplainConfig', None) image_file = request.FILES.get('imageFile', None) details_img_file = request.FILES.get('detailsImgFile', None) icon_file = request.FILES.get('iconFile', None) if not algorithm_id: return response.json(444) try: device_algorithm_type = DeviceAlgorithmType.objects.get(pk=algorithm_id) if not device_algorithm_type: return response.json(173) algorithm_type = device_algorithm_type.type if lang_configs: lang_configs = json.loads(lang_configs) DeviceAlgorithmExplain.objects.filter(algorithm_type_id=algorithm_id).delete() # 处理算法翻译数据 explain_instances = [] for lang_config in lang_configs: explain_instance = DeviceAlgorithmExplain( algorithm_type_id=int(algorithm_id), # 关联的 DeviceAlgorithmType 的 id title=lang_config['title'], subtitle=lang_config['subtitle'], price=Decimal(lang_config['price'] if lang_config['price'] else 0), introduction=lang_config['introduction'], install_explain=lang_config['installExplain'], concerning=lang_config['concerning'], risk_warning=lang_config['riskWarning'], lang=lang_config['lang'], updated_time=int(time.time()), created_time=int(time.time()) ) explain_instances.append(explain_instance) DeviceAlgorithmExplain.objects.bulk_create(explain_instances) if memory: device_algorithm_type.memory = memory if tag: device_algorithm_type.tag = tag if status: device_algorithm_type.status = status if status == 1 and expire_time: device_algorithm_type.expire_time = expire_time if sort: device_algorithm_type.sort = sort if basic_function: device_algorithm_type.basic_function = basic_function if resource: device_algorithm_type.resource = resource if image_file: image_url = cls.upload_image("app/algorithm-shop/image/", algorithm_type, image_file) device_algorithm_type.image_url = image_url if icon_file: icon_url = cls.upload_image("app/algorithm-shop/icon/", algorithm_type, icon_file) device_algorithm_type.icon_url = icon_url if details_img_file: details_img_url = cls.upload_image("app/algorithm-shop/details-img/", algorithm_type, details_img_file) device_algorithm_type.details_img_url = details_img_url device_algorithm_type.save() return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))