123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- # -*- coding: utf-8 -*-
- """
- @Author : Rocky
- @Time : 2023/07/13 11:30
- @File :VseesController.py
- """
- import time
- from django.db import transaction
- from django.views import View
- from Ansjer.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SES_ACCESS_REGION
- from Model.models import VseesDeviceType, vseesProductInfo
- from Object.AWS.AmazonS3Util import AmazonS3Util
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- class VseesManagement(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, operation, request)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, operation, request)
- def validation(self, request_dict, operation, request):
- response = ResponseObject()
- if operation == 'get-device-info': # 查询微瞳设备类型图标信息
- return self.get_device_info(response)
- elif operation == 'get-product-info': # 获取产品信息
- return self.get_product_info(request_dict, request)
- else:
- tko = TokenObject(
- request.META.get('HTTP_AUTHORIZATION'),
- returntpye='pc')
- if tko.code != 0:
- return response.json(tko.code)
- response.lang = tko.lang
- user_id = tko.userID
- if operation == 'get-info': # 获取信息
- return self.get_info(request_dict, response)
- elif operation == 'add-info': # 新增信息
- return self.add_info(request_dict, request, response)
- elif operation == 'edit-status': # 修改状态
- return self.edit_status(request_dict, response)
- elif operation == 'upload-file': # 更新文件
- return self.upload_file(request_dict, request, response)
- elif operation == 'edit-info': # 修改信息
- return self.edit_info(request_dict, response)
- else:
- return response.json(404)
- @staticmethod
- def get_info(request_dict, response):
- """
- 获取微瞳产品信息
- @request_dict vsees_id:设备类型id
- @request_dict type:0:视频, 1:说明书, 2:固件
- return:
- """
- vsees_id = request_dict.get('vsees_id', None)
- product_type = request_dict.get('type', None)
- if not all([vsees_id, product_type]):
- return response.json(444, 'vsees_id, type')
- try:
- product_info_qs = vseesProductInfo.objects.filter(vsees__id=vsees_id, type=product_type,
- status=0).values('title', 'file_name', 'status').order_by(
- '-add_time')
- if not product_info_qs.exists():
- return response.json(173)
- return response.json(0, list(product_info_qs))
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def add_info(cls, request_dict, request, response):
- """
- 新增微瞳产品信息
- @request_dict product_id:产品信息id
- @request_dict file:文件
- @request_dict vsees_id:设备类型id
- @request_dict url:路径
- @request_dict title:标题
- @request_dict device_type:0:视频, 1:说明书, 2:固件
- @request_dict status:状态0:已上架, 1: 已下架
- return:
- """
- file = request.FILES.get('file', None)
- vsees_id = request_dict.get('vsees_id', None)
- url = request_dict.get('url', None)
- title = request_dict.get('title', None)
- product_type = request_dict.get('type', None)
- if not all([file or url, title, product_type, vsees_id]):
- return response.json(444, 'error: file or url, title, type, vsees_id')
- nowTime = int(time.time())
- fileName = str(file)
- device_type = int(product_type)
- # 判断上架状态标题是否重复
- product_info_qs = vseesProductInfo.objects.filter(vsees__id=vsees_id, title=title, type=product_type, status=0)
- if product_info_qs.exists():
- return response.json(174)
- try:
- with transaction.atomic():
- data = {
- 'vsees_id': vsees_id,
- 'title': title,
- 'type': device_type,
- 'status': 0,
- 'add_time': nowTime,
- 'upd_time': nowTime,
- }
- # 添加链接
- if device_type == 0:
- data['file_name'] = url
- data['url'] = url
- vseesProductInfo.objects.create(**data)
- return response.json(0)
- # 添加说明书
- elif device_type == 1:
- url = 'https://d2cjxvw3tr9apc.cloudfront.net/vsees/clarification/{}'.format(fileName)
- file_key = 'vsees/clarification/{}'
- # 添加固件
- elif device_type == 2:
- url = 'https://d2cjxvw3tr9apc.cloudfront.net/vsees/firmware/{}'.format(fileName)
- file_key = 'vsees/firmware/{}'
- key = file_key.format(fileName)
- s3 = cls.upload(file, key)
- if s3:
- data['file_name'] = fileName
- data['url'] = url
- vseesProductInfo.objects.create(**data)
- return response.json(0)
- return response.json(178)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def upload(cls, file, key):
- """
- 上传
- @request_dict file:文件
- @request_dict key:需要上传文件路径+文件名
- """
- bucket_name = 'ansjerfilemanager'
- s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[1], AWS_SECRET_ACCESS_KEY[1], AWS_SES_ACCESS_REGION)
- s3 = s3.upload_file_obj(
- bucket_name,
- key,
- file,
- {'ContentType': file.content_type, 'ACL': 'public-read'})
- if s3:
- return True
- else:
- return False
- @classmethod
- def upload_file(cls, request_dict, request, response):
- """
- 重新上传文件
- @request_dict product_id:产品信息id
- @request_dict file:文件
- @request_dict device_type:0:视频, 1:说明书, 2:固件
- """
- file = request.FILES.get('file', None)
- product_id = request_dict.get('product_id', None)
- product_type = request_dict.get('type', None)
- if not all([file, product_type, product_id]):
- return response.json(444, 'error: file, type, product_id')
- nowTime = int(time.time())
- fileName = str(file)
- product_info_qs=vseesProductInfo.objects.filter(id=product_id, type=product_type, status=0)
- if not product_info_qs.exists():
- return response.json(177)
- try:
- # 说明
- if product_type == 1:
- url = 'https://d2cjxvw3tr9apc.cloudfront.net/vsees/clarification/{}'.format(fileName)
- file_key = 'vsees/clarification/{}'
- # 固件
- else:
- url = 'https://d2cjxvw3tr9apc.cloudfront.net/vsees/firmware/{}'.format(fileName)
- file_key = 'vsees/firmware/{}'
- key = file_key.format(fileName)
- s3 = cls.upload(file, key)
- if s3:
- vseesProductInfo.objects.filter(id=product_id).update(url=url, upd_time=nowTime, file_name=fileName)
- return response.json(0)
- return response.json(177)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def edit_status(request_dict, response):
- """
- 修改状态
- @request_dict product_id:id
- @request_dict status:状态0:已上架, 1: 已下架
- """
- product_id = request_dict.get('product_id', None)
- status = request_dict.get('status', None)
- if not all([status, product_id]):
- return response.json(444, 'error: status, product_id')
- # 只下架
- if int(status) == 0:
- return response.json(177)
- vseesProductInfo.objects.filter(id=product_id).update(status=status)
- return response.json(0)
- @staticmethod
- def edit_info(request_dict, response):
- """
- 修改信息
- @request_dict product_id:id
- @request_dict url:路径
- @request_dict title:标题
- """
- product_id = request_dict.get('product_id', None)
- url = request_dict.get('url', None)
- title = request_dict.get('title')
- if not product_id:
- return response.json(444, 'error: product_id')
- product_info_qs = vseesProductInfo.objects.filter(id=product_id, title=title, status=0)
- if product_info_qs.exists():
- return response.json(174)
- if title:
- product_info_qs.update(title=title)
- if url:
- product_info_qs.update(url=url)
- return response.json(0)
- # 用户界面接口 -------------------------------------------------------------------------------------------------------
- @staticmethod
- def get_device_info(response):
- """
- 获取微瞳图标信息
- """
- try:
- device_type_qs = VseesDeviceType.objects.all().values().order_by('id')
- return response.json(0, list(device_type_qs))
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def get_product_info(request_dict, response):
- """
- 获取微瞳产品信息
- @request_dict deviceTypeId:设备类型id
- @request_dict type:0:视频, 1:说明书, 2:固件
- @request_dict title:标题
- return:
- """
- vsees_id = request_dict.get('vsees_id', None)
- title = request_dict.get('title', None)
- product_type = request_dict.get('type', None)
- if not vsees_id:
- return response.json(444, 'vsees_id')
- try:
- product_info_qs = vseesProductInfo.objects.all()
- if not title and product_type:
- product_info_qs = product_info_qs.filter(vsees__id=vsees_id, status=0).values('title').order_by(
- '-add_time')
- return response.json(0, list(product_info_qs))
- else:
- product_info_qs = product_info_qs.filter(vsees__id=vsees_id, title=title, type=product_type,
- status=0).values('url').order_by('-add_time')
- if not product_info_qs.exists():
- return response.json(173)
- return response.json(0, {'url': product_info_qs[0]['url']})
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
|