#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time import oss2 from django.views import View from Ansjer.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET from Model.models import VoicePromptModel, UidChannelSetModel, Device_Info from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Service.CommonService import CommonService from Service.ModelService import ModelService class VoicePromptView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET operation = kwargs.get('operation', None) return self.validate(request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST operation = kwargs.get('operation', None) return self.validate(request_dict, operation) def validate(self, request_dict, operation): token = request_dict.get('token', None) lang = request_dict.get('lang', None) response = ResponseObject(lang=lang) token = TokenObject(token) if token.code != 0: return response.json(token.code) if operation == 'getUploadUrl': return self.get_upload_url(request_dict, response) elif operation == 'add': return self.do_add(request_dict, response) elif operation == 'delete': return self.do_delete(token.userID, request_dict, response) elif operation == 'query': return self.do_query(request_dict, response) elif operation == 'update': return self.do_update(token.userID, request_dict, response) elif operation == 'adminGetUploadUrl': return self.admin_get_upload_url(token.userID, request_dict, response) elif operation == 'adminAdd': return self.do_admin_add(token.userID, request_dict, response) elif operation == 'adminQuery': return self.do_admin_query(token.userID, request_dict, response) elif operation == 'adminUpdate': return self.do_admin_update(token.userID, request_dict, response) elif operation == 'adminDelete': return self.do_admin_delete(token.userID, request_dict, response) else: return response.json(404) def get_upload_url(self, request_dict, response): upload_type = request_dict.get('upload_type', None) uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) type = request_dict.get('type', None) if upload_type and uid and channel: count = VoicePromptModel.objects.filter(uid=uid, channel=channel, type=type).count() if count >= 3: return response.json(201) auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'ansjer-static-resources') name = CommonService.createOrderID() filename = str(name) + '.' + upload_type obj = 'voice_prompt/{uid}/{channel}/'.format(uid=uid, channel=channel) + filename url = bucket.sign_url('PUT', obj, 7200) return response.json(0, {'put_url': url, 'filename': filename}) else: return response.json(444) def do_add(self, request_dict, response): filename = request_dict.get('filename', None) title = request_dict.get('title', None) type = request_dict.get('type', None) lang = request_dict.get('lang', '') uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if filename and title and type and uid and channel: voice_prompt = VoicePromptModel() voice_prompt.filename = filename voice_prompt.title = title voice_prompt.type = type voice_prompt.language = lang voice_prompt.classification = 1 voice_prompt.uid = uid voice_prompt.channel = channel voice_prompt.add_time = int(time.time()) voice_prompt.save() res = { 'id': voice_prompt.id, 'filename': filename, 'title': title, 'type': type, } auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'ansjer-static-resources') filename = res['filename'] obj = 'voice_prompt/uid/channel/'.format(uid=uid, channel=channel) + filename url = bucket.sign_url('GET', obj, 3600) res['url'] = url del res['filename'] return response.json(0, res) else: return response.json(444) def do_update(self, userID, request_dict, response): id = request_dict.get('id', None) title = request_dict.get('title', None) if id and title: voice_qs = VoicePromptModel.objects.filter(id=id) if voice_qs.exists(): uid = voice_qs[0].uid device_qs = Device_Info.objects.filter(UID=uid, userID=userID) if device_qs.exists(): voice_qs.update(title=title) return response.json(0) else: return response.json(404) else: return response.json(173) else: return response.json(444) def do_delete(self, userID, request_dict, response): id = request_dict.get('id', None) if id: voice_qs = VoicePromptModel.objects.filter(id=id) if voice_qs.exists(): uid = voice_qs[0].uid device_qs = Device_Info.objects.filter(UID=uid, userID=userID) if device_qs.exists(): voice_qs.delete() # auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) # bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'ansjer-static-resources') # obj = 'voice_prompt/{uid}/{channel}/'.format(uid=uid, channel=voice_qs[0].channel) + voice_qs[0].filename # bucket.delete_object(obj) return response.json(0) else: return response.json(404) else: return response.json(444) def do_query(self, request_dict, response): lang = request_dict.get('lang', None) uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid and channel and lang: voice_qs = VoicePromptModel.objects.filter(uid=uid, channel=channel, classification=1) system_qs = VoicePromptModel.objects.filter(classification=0, language=lang, status=1) channel_qs = UidChannelSetModel.objects.filter(uid__uid=uid, channel=channel) res = { 'enter_voice': {}, 'leave_voice': {}, 'voice_status': 0, 'voice_mute': 0 } if channel_qs.exists(): channel_qs = channel_qs.values('voice_prompt_enter', 'voice_prompt_leave', 'voice_prompt_status', 'voice_prompt_intelligent_mute', 'voice_start_x', 'voice_start_y', 'voice_end_x', 'voice_end_y', 'voice_start_time', 'voice_end_time', 'voice_repeat_day', 'voice_direction') print(channel_qs) res['enter_voice'] = int(channel_qs[0]['voice_prompt_enter']) res['leave_voice'] = int(channel_qs[0]['voice_prompt_leave']) res['voice_status'] = channel_qs[0]['voice_prompt_status'] res['voice_mute'] = channel_qs[0]['voice_prompt_intelligent_mute'] res['start_x'] = channel_qs[0]['voice_start_x'] res['start_y'] = channel_qs[0]['voice_start_y'] res['end_x'] = channel_qs[0]['voice_end_x'] res['end_y'] = channel_qs[0]['voice_end_y'] res['start_time'] = channel_qs[0]['voice_start_time'] res['end_time'] = channel_qs[0]['voice_end_time'] res['repeat_day'] = channel_qs[0]['voice_repeat_day'] res['direction'] = channel_qs[0]['voice_direction'] enter_systems = [] leave_systems = [] enter_customs = [] leave_customs = [] res['system'] = {} res['custom'] = {} auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'ansjer-static-resources') if system_qs.exists(): system_qs = system_qs.values('id', 'title', 'filename', 'type') for system in system_qs: filename = system['filename'] obj = 'voice_prompt/system/' + filename url = bucket.sign_url('GET', obj, 3600) system['url'] = url del system['filename'] if system['type'] == 0: enter_systems.append(system) if res['enter_voice'] == system['id']: res['enter_voice'] = system elif system['type'] == 1: leave_systems.append(system) if res['leave_voice'] == system['id']: res['leave_voice'] = system if voice_qs.exists(): voice_qs = voice_qs.values('id', 'title', 'filename', 'type') for voice in voice_qs: filename = voice['filename'] obj = 'voice_prompt/' + uid + '/' + channel + '/' + filename url = bucket.sign_url('GET', obj, 3600) voice['url'] = url del voice['filename'] if voice['type'] == 0: enter_customs.append(voice) if res['enter_voice'] == voice['id']: res['enter_voice'] = voice elif voice['type'] == 1: leave_customs.append(voice) if res['leave_voice'] == voice['id']: res['leave_voice'] = voice if res['leave_voice'] == 0: res['leave_voice'] = {} if res['enter_voice'] == 0: res['enter_voice'] = {} res['system']['enter'] = enter_systems res['system']['leave'] = leave_systems res['custom']['enter'] = enter_customs res['custom']['leave'] = leave_customs return response.json(0, res) else: return response.json(444) def admin_get_upload_url(self, userID, request_dict, response): own_perm = ModelService.check_perm(userID, 10) if not own_perm: return response.json(404) upload_type = request_dict.get('upload_type', None) if upload_type: auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'ansjer-static-resources') name = CommonService.createOrderID() filename = str(name) + '.' + upload_type obj = 'voice_prompt/system/' + filename url = bucket.sign_url('PUT', obj, 7200) return response.json(0, {'put_url': url, 'filename': filename}) else: return response.json(444) def do_admin_add(self, userID, request_dict, response): own_perm = ModelService.check_perm(userID, 10) if not own_perm: return response.json(404) filename = request_dict.get('filename', None) title = request_dict.get('title', None) type = request_dict.get('type', None) lang = request_dict.get('lang', '') if filename and title and type: voice_prompt = VoicePromptModel() voice_prompt.filename = filename voice_prompt.title = title voice_prompt.type = type voice_prompt.language = lang voice_prompt.classification = 0 voice_prompt.add_time = int(time.time()) voice_prompt.status = 0 voice_prompt.save() return response.json(0) else: return response.json(444) def do_admin_query(self, userID, request_dict, response): own_perm = ModelService.check_perm(userID, 10) if not own_perm: return response.json(404) type = request_dict.get('type', 0) page = request_dict.get('page', None) line = request_dict.get('line', None) if page is None or line is None: return response.json(444) voice_qs = VoicePromptModel.objects.filter(classification=0, type=type) res = { 'count': 0 } if voice_qs.exists(): page = int(page) line = int(line) start = (page - 1) * line end = start + line count = voice_qs.count() res['count'] = count voice_qs = voice_qs.values('id', 'title', 'type', 'language', 'status', 'filename')[start:end] auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'ansjer-static-resources') for item in voice_qs: filename = item['filename'] obj = 'voice_prompt/system/' + filename url = bucket.sign_url('GET', obj, 3600) item['url'] = url del item['filename'] res['data'] = list(voice_qs) return response.json(0, res) else: res['data'] = [] return response.json(0, res) def do_admin_update(self, userID, request_dict, response): own_perm = ModelService.check_perm(userID, 10) if not own_perm: return response.json(404) id = request_dict.get('id', None) status = request_dict.get('status', None) title = request_dict.get('title', None) if id and status and title: VoicePromptModel.objects.filter(id=id, classification=0).update(status=status, title=title) return response.json(0) else: return response.json(444) def do_admin_delete(self, userID, request_dict, response): own_perm = ModelService.check_perm(userID, 10) if not own_perm: return response.json(404) id = request_dict.get('id', None) if id: VoicePromptModel.objects.filter(id=id, classification=0).delete() return response.json(0) else: return response.json(444)