#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: AnsjerFormal @software: PyCharm @DATE: 2024年7月29日15:51:04 @Version: python3.6 @MODIFY DECORD:ansjer dev """ import time from django.views.generic.base import View from Model.models import FreeEvaluationActivity, ActivityTime, ActivityUser from Object.AWS.AmazonS3Util import AmazonS3Util from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Ansjer.config import SECRET_ACCESS_KEY, ACCESS_KEY_ID, REGION_NAME from django.db import transaction class EvaluationActivityView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): language = request_dict.get('language', 'en') response = ResponseObject(language, 'pc') tko = TokenObject(request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') if tko.code != 0: return response.json(tko.code) response.lang = tko.lang user_id = tko.userID if operation == 'getActivity': return self.get_activity(user_id, request_dict, response) elif operation == 'getActivityList': return self.get_activity_list(user_id, request_dict, response) elif operation == 'addOrEditActivity': return self.add_or_edit_activity(request, request_dict, response) elif operation == 'getActivityUser': return self.get_user_list(request_dict, response) elif operation == 'addActivityUser': return self.add_activity_user(user_id, request_dict, response) elif operation == 'editActivityUser': return self.edit_activity_user(request_dict, response) else: return response.json(404) @staticmethod def get_activity(user_id, request_dict, response): try: now_time = int(time.time()) activity_qs = FreeEvaluationActivity.objects.filter(is_show=1).values('activity_name', 'carousel_image_url', 'details_image_url', 'issue', 'id', 'product_number', 'original_price') if activity_qs.exists(): activity = activity_qs[0] time_qs = ActivityTime.objects.filter(activity_id=activity['id']).values('node_content', 'start_time', 'end_time').order_by('sort') activity['activity_start_time'] = time_qs.first()['start_time'] activity['activity_end_time'] = time_qs.last()['end_time'] activity['activity_process'] = list(time_qs) user_qs = ActivityUser.objects.filter(activity_id=activity['id']) activity['user_count'] = user_qs.count() if user_qs.exists(): user = user_qs.filter(user_id=user_id) if user.exists(): activity['activity_status'] = 2 # 1:可报名;2:已报名;3:报名截止;4:活动结束 else: activity['activity_status'] = 1 user_qs = user_qs.filter(is_selected=1).values('user_name', 'phone') activity['activity_user'] = list(user_qs) else: activity['activity_status'] = 1 activity['activity_user'] = [] if now_time > time_qs.first()['end_time']: activity['activity_status'] = 3 if now_time > time_qs.last()['end_time']: activity['activity_status'] = 4 return response.json(0, activity) else: return response.json(0, {}) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def add_activity_user(user_id, request_dict, response): activity_id = request_dict.get('activity_id', None) user_name = request_dict.get('user_name', None) phone = request_dict.get('phone', None) address = request_dict.get('address', None) sex = request_dict.get('sex', None) age = request_dict.get('age', None) usage_environment = request_dict.get('usage_environment', None) is_reports = request_dict.get('is_reports', None) if not all([activity_id, user_name, phone, address, sex, age, usage_environment, is_reports]): return response.json(404) now_time = int(time.time()) try: user = ActivityUser.objects.filter(user_id=user_id, activity_id=activity_id) if user.exists(): user.update(phone=phone, address=address, sex=sex, user_name=user_name, is_reports=is_reports, age=age, usage_environment=usage_environment, update_time=now_time) else: ActivityUser.objects.create(activity_id=activity_id, user_id=user_id, phone=phone, address=address, sex=sex, age=age, usage_environment=usage_environment, user_name=user_name, is_reports=is_reports, created_time=now_time, update_time=now_time) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def add_or_edit_activity(request, request_dict, response): activity_id = request_dict.get('activity_id', None) activity_name = request_dict.get('activity_name', None) carousel_image = request.FILES.get('carousel_image', None) details_image = request.FILES.get('details_image', None) issue = request_dict.get('issue', None) product_number = request_dict.get('product_number', None) original_price = request_dict.get('original_price', None) activity_process = request_dict.get('activity_process', None) is_show = request_dict.get('is_show', None) if not all([activity_name, issue, product_number, original_price, is_show, activity_process]): return response.json(404) now_time = int(time.time()) activity_process = eval(activity_process) try: bucket = "ansjerfilemanager" s3_obj = AmazonS3Util(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME) s3_url = 'https://{}.s3.{}.amazonaws.com.cn/'.format(bucket, REGION_NAME) if carousel_image: carousel_image_path = '前端/EvaluationActivity/carousel_image_{}.jpg'.format(issue) s3_obj.upload_file_obj( bucket, carousel_image_path, carousel_image, {"ContentType": carousel_image.content_type, "ACL": "public-read"}, ) carousel_image_url = s3_url + carousel_image_path if activity_id: FreeEvaluationActivity.objects.filter(id=activity_id).update(carousel_image_url=carousel_image_url) if details_image: details_image_path = '前端/EvaluationActivity/details_image_{}.jpg'.format(issue) s3_obj.upload_file_obj( bucket, details_image_path, details_image, {"ContentType": details_image.content_type, "ACL": "public-read"}, ) details_image_url = s3_url + details_image_path if activity_id: FreeEvaluationActivity.objects.filter(id=activity_id).update(details_image_url=details_image_url) with transaction.atomic(): if activity_id: # 编辑活动 FreeEvaluationActivity.objects.filter(id=activity_id).update(issue=issue, product_number=product_number, original_price=original_price, is_show=is_show, update_time=now_time) for index, item in enumerate(activity_process): time_qs = ActivityTime.objects.filter(activity_id=activity_id, node_content=item['node_content']) if time_qs.exists(): time_qs.update(start_time=item['start_time'], end_time=item['end_time'], sort=index) else: ActivityTime.objects.create(activity_id=activity_id, node_content=item['node_content'], start_time=item['start_time'], end_time=item['end_time'], sort=index) else: # 增加活动 if not all([carousel_image, details_image]): return response.json(404) FreeEvaluationActivity.objects.create(activity_name=activity_name, product_number=product_number, carousel_image_url=carousel_image_url, is_show=is_show, original_price=original_price, details_image_url=details_image_url, issue=issue) for index, item in enumerate(activity_process): ActivityTime.objects.create(activity_id=activity_id, node_content=item['node_content'], start_time=item['start_time'], end_time=item['end_time'], sort=index) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_activity_list(user_id, request_dict, response): try: activity_qs = FreeEvaluationActivity.objects.all().values('id', 'activity_name', 'carousel_image_url', 'details_image_url', 'issue', 'is_show', 'product_number', 'original_price') for item in activity_qs: time_qs = ActivityTime.objects.filter(activity_id=item['id']).values('node_content', 'start_time', 'end_time').order_by('sort') item['activity_process'] = list(time_qs) return response.json(0, list(activity_qs)) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_user_list(request_dict, response): activity_id = request_dict.get('activity_id', None) try: user_qs = ActivityUser.objects.all() if activity_id: user_qs = user_qs.filter(activity_id=activity_id) user_qs = user_qs.values('id', 'user_name', 'phone', 'age', 'address', 'sex', 'is_selected', 'activity_id', 'usage_environment', 'is_reports') return response.json(0, list(user_qs)) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def edit_activity_user(request_dict, response): activity_id = request_dict.get('activity_id', None) user_id = request_dict.get('user_id', None) is_selected = request_dict.get('is_selected', None) try: ActivityUser.objects.filter(activity_id=activity_id, id=user_id).update(is_selected=is_selected) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))