123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import operator
- import time
- import oss2
- from django.db.models import F
- from django.views import View
- from Ansjer.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET
- from Model.models import DeviceTypeModel, AppBundle, DeviceNameLanguage
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- from Service.CommonService import CommonService
- from Service.ModelService import ModelService
- class DeviceTypeView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- request_dict = request.GET
- operation = kwargs.get('operation', None)
- return self.validate(request_dict, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- request_dict = request.POST
- operation = kwargs.get('operation', None)
- return self.validate(request_dict, operation)
- def validate(self, request_dict, operation):
- token = request_dict.get('token', None)
- lang = request_dict.get('lang', None)
- response = ResponseObject(lang=lang)
- token = TokenObject(token)
- if token.code != 0:
- return response.json(token.code)
- if operation == 'add':
- return self.do_admin_add(token.userID, request_dict, response)
- elif operation == 'query':
- return self.do_query(request_dict, response)
- elif operation == 'delete':
- return self.do_admin_delete(token.userID, request_dict, response)
- elif operation == 'getUploadUrl':
- return self.get_upload_url(request_dict, response)
- else:
- return response.json(404)
- def get_upload_url(self, request_dict, response):
- upload_type = request_dict.get('upload_type', None)
- if upload_type:
- auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET)
- bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'ansjer-static-resources')
- name = CommonService.createOrderID()
- filename = str(name) + '.' + upload_type
- obj = 'device_type/' + filename
- url = bucket.sign_url('PUT', obj, 7200)
- return response.json(0, {'put_url': url, 'filename': filename})
- else:
- return response.json(444)
- def do_admin_add(self, userID, request_dict, response):
- own_perm = ModelService.check_perm(userID, 10)
- if not own_perm:
- return response.json(404)
- name = request_dict.get('name', None)
- model = request_dict.get('model', None)
- type = request_dict.get('type', None)
- ptz_type = request_dict.get('ptz_type', None)
- icon = request_dict.get('icon', None)
- if name and model and type and ptz_type and icon:
- now_time = int(time.time())
- device_type = DeviceTypeModel()
- device_type.name = name
- device_type.model = model
- device_type.type = type
- device_type.ptz_type = ptz_type
- device_type.icon = icon
- device_type.add_time = now_time
- device_type.update_time = now_time
- device_type.save()
- return response.json(0)
- else:
- return response.json(444)
- def do_query(self, request_dict, response):
- """
- 获取设备图标
- @param request_dict:请求参数
- @request_dict app_bundle_id:包名
- """
- app_bundle_id = request_dict.get('app_bundle_id', None)
- if not app_bundle_id:
- return response.json(444, {'error param': 'app_bundle_id'})
- try:
- lang = response.lang
- app_device_qs = DeviceNameLanguage.objects.filter(lang=lang)
- if not app_device_qs.exists():
- lang = 'en'
- # 同步设备图标
- app_bundle_qs = AppBundle.objects.filter(app_bundle_id=app_bundle_id).values_list(
- 'app_device_type__app_version_number_id').distinct().order_by('app_device_type__app_version_number_id')
- version_number_list = [app_bundle for app_bundle in app_bundle_qs]
- versions_filtered = version_number_list[:-1]
- app_bundle_list = []
- for version in versions_filtered:
- version = ''.join(map(str, version))
- app_bundle_qs = AppBundle.objects.filter(app_bundle_id=app_bundle_id,
- app_device_type__devicenamelanguage__lang=lang,
- app_device_type__app_version_number_id=version). \
- annotate(
- model=F('app_device_type__model'), type=F('app_device_type__type'), icon=F('app_device_type__icon'),
- name=F('app_device_type__devicenamelanguage__name'),
- sort=F('app_device_type__devicenamelanguage__sort')).values('model', 'type', 'icon',
- 'name', 'sort')
- for app_bundle in app_bundle_qs:
- app_bundle_list.append({
- 'model': app_bundle['model'],
- 'type': app_bundle['type'],
- 'icon': app_bundle['icon'],
- 'name': app_bundle['name'],
- 'sort': app_bundle['sort'],
- })
- app_bundle_list = sorted(app_bundle_list, key=operator.itemgetter('sort'))
- return response.json(0, {'data': app_bundle_list})
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- def do_admin_delete(self, userID, request_dict, response):
- own_perm = ModelService.check_perm(userID, 10)
- if not own_perm:
- return response.json(404)
- id = request_dict.get('id', None)
- if id:
- DeviceTypeModel.objects.filter(id=id).delete()
- return response.json(0)
- else:
- return response.json(444)
|