#!/usr/bin/env python3 # -*- coding: utf-8 -*- import operator import time import oss2 from django.db.models import F from django.views import View from Model.models import DeviceTypeModel, AppBundle, DeviceNameLanguage from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Service.CommonService import CommonService from Service.ModelService import ModelService from django.conf import settings OSS_STS_ACCESS_KEY = settings.OSS_STS_ACCESS_KEY OSS_STS_ACCESS_SECRET = settings.OSS_STS_ACCESS_SECRET class DeviceTypeView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET operation = kwargs.get('operation', None) return self.validate(request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST operation = kwargs.get('operation', None) return self.validate(request_dict, operation) def validate(self, request_dict, operation): token = request_dict.get('token', None) lang = request_dict.get('lang', None) response = ResponseObject(lang=lang) token = TokenObject(token) if token.code != 0: return response.json(token.code) if operation == 'add': return self.do_admin_add(token.userID, request_dict, response) elif operation == 'query': return self.do_query(request_dict, response) elif operation == 'delete': return self.do_admin_delete(token.userID, request_dict, response) elif operation == 'getUploadUrl': return self.get_upload_url(request_dict, response) else: return response.json(404) def get_upload_url(self, request_dict, response): upload_type = request_dict.get('upload_type', None) if upload_type: auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'ansjer-static-resources') name = CommonService.createOrderID() filename = str(name) + '.' + upload_type obj = 'device_type/' + filename url = bucket.sign_url('PUT', obj, 7200) return response.json(0, {'put_url': url, 'filename': filename}) else: return response.json(444) def do_admin_add(self, userID, request_dict, response): own_perm = ModelService.check_perm(userID, 10) if not own_perm: return response.json(404) name = request_dict.get('name', None) model = request_dict.get('model', None) type = request_dict.get('type', None) ptz_type = request_dict.get('ptz_type', None) icon = request_dict.get('icon', None) if name and model and type and ptz_type and icon: now_time = int(time.time()) device_type = DeviceTypeModel() device_type.name = name device_type.model = model device_type.type = type device_type.ptz_type = ptz_type device_type.icon = icon device_type.add_time = now_time device_type.update_time = now_time device_type.save() return response.json(0) else: return response.json(444) def do_query(self, request_dict, response): """ 获取设备图标 @param request_dict:请求参数 @request_dict app_bundle_id:包名 """ app_bundle_id = request_dict.get('app_bundle_id', None) if not app_bundle_id: app_bundle_id = 'com.ansjer.zccloud_a' try: lang = response.lang app_device_qs = DeviceNameLanguage.objects.filter(lang=lang) if not app_device_qs.exists(): lang = 'en' # 同步设备图标 app_bundle_qs = AppBundle.objects.filter(app_bundle_id=app_bundle_id).values_list( 'app_device_type__app_version_number_id').distinct().order_by('app_device_type__app_version_number_id') version_number_list = [app_bundle for app_bundle in app_bundle_qs] versions_filtered = version_number_list[:-1] app_bundle_list = [] for version in versions_filtered: version = ''.join(map(str, version)) app_bundle_qs = AppBundle.objects.filter(app_bundle_id=app_bundle_id, app_device_type__devicenamelanguage__lang=lang, app_device_type__app_version_number_id=version). \ annotate( model=F('app_device_type__model'), type=F('app_device_type__type'), icon=F('app_device_type__icon'), name=F('app_device_type__devicenamelanguage__name'), sort=F('app_device_type__devicenamelanguage__sort')).values('model', 'type', 'icon', 'name', 'sort') for app_bundle in app_bundle_qs: app_bundle_list.append({ 'model': app_bundle['model'], 'type': app_bundle['type'], 'icon': app_bundle['icon'], 'name': app_bundle['name'], 'sort': app_bundle['sort'], }) app_bundle_list = sorted(app_bundle_list, key=operator.itemgetter('sort')) return response.json(0, {'data': app_bundle_list}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def do_admin_delete(self, userID, request_dict, response): own_perm = ModelService.check_perm(userID, 10) if not own_perm: return response.json(404) id = request_dict.get('id', None) if id: DeviceTypeModel.objects.filter(id=id).delete() return response.json(0) else: return response.json(444)