import json import time import boto3 import botocore import oss2 from botocore import client from django.http import JsonResponse from django.views.generic.base import View from Ansjer.config import DETECT_PUSH_DOMAIN, DETECT_PUSH_DOMAINS, DETECT_PUSH_DOMAIN_JIUAN, DETECT_PUSH_DOMAINS_JIUAN, \ OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY from Ansjer.config import PUSH_REDIS_ADDRESS from Model.models import Device_Info, VodHlsModel, Equipment_Info, UidSetModel, UidPushModel, CompanyModel, SysMsgModel, \ AiService from Object.ETkObject import ETkObject from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Service.CommonService import CommonService from Service.EquipmentInfoService import EquipmentInfoService class DetectControllerViewV2(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') # self.ip = CommonService.get_ip_address(request) return self.validation(request.GET, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') # self.ip = CommonService.get_ip_address(request) return self.validation(request.POST, operation) def validation(self, request_dict, operation): response = ResponseObject() if operation is None: return response.json(444, 'error path') token = request_dict.get('token', None) lang = request_dict.get('lang', None) if lang: response = ResponseObject(lang) tko = TokenObject(token) if tko.code == 0: userID = tko.userID # 修改推送设置 if operation == 'changeStatus': return self.do_change_status(userID, request_dict, response) # 查询推送信息 elif operation == 'queryInfo': return self.do_query(request_dict, response, userID) # 更新推送延迟 elif operation == 'updateInterval': return self.do_update_interval(userID, request_dict, response) else: return response.json(414) else: return response.json(tko.code) def do_change_status(self, userID, request_dict, response): token_val = request_dict.get('token_val', None) appBundleId = request_dict.get('appBundleId', None) app_type = request_dict.get('app_type', None) push_type = request_dict.get('push_type', None) status = request_dict.get('status', None) m_code = request_dict.get('m_code', None) uid = request_dict.get('uid', None) lang = request_dict.get('lang', 'en') tz = request_dict.get('tz', '0') company_secrete = request_dict.get('company_secrete', None) region = request_dict.get('region', None) # app必须传:1:国外,2:国内 electricity_status = request_dict.get('electricity_status', None) if not region: return response.json(444, 'region') region = int(region) # 消息提醒功能新增 # 如果传空上来,就默认为0 if tz == '': tz = 0 else: tz = tz.replace("GMT", "") detect_group = request_dict.get('detect_group', None) interval = request_dict.get('interval', None) if not status and not electricity_status: return response.json(444, 'status and electricity_status') if not company_secrete: return response.json(444, 'company_secrete') company = CompanyModel.objects.filter(secret=company_secrete) if not company.exists(): return response.json(444, 'company_secrete') # 关闭推送 if not all([appBundleId, app_type, token_val, uid, m_code]): return response.json(444, 'appBundleId,app_type,token_val,uid,m_code') try: # 判断用户是否拥有设备 device_info_qs = Device_Info.objects.filter(userID_id=userID, UID=uid) if not device_info_qs.exists(): device_info_qs = Device_Info.objects.filter(userID_id=userID, serial_number=uid) if not device_info_qs.exists(): return response.json(14) # 更新或创建uid_set数据 nowTime = int(time.time()) uid_set_data = { 'device_type': device_info_qs[0].Type } # 设置开关状态,0:关闭,1:开启 if status: status = int(status) uid_set_data['detect_status'] = status device_info_qs.update(NotificationMode=status) # 检测类型 if detect_group: uid_set_data['detect_group'] = detect_group uid_set = UidSetModel.objects.filter(uid=uid).order_by('-updTime') if uid_set.exists(): interval = uid_set.first().new_detect_interval if not interval else interval # 设置消息推送间隔 if interval: interval = int(interval) uid_set_data['detect_interval'] = interval # 开通了ai服务的设备,通过mqtt通知设备修改消息推送间隔 ai_service_qs = AiService.objects.filter(uid=uid, use_status=1, endTime__gte=nowTime) if ai_service_qs.exists(): topic_name = 'ansjer/generic/{}'.format(uid) msg = { 'commandType': 'AIState', 'payload': { 'IntervalTime': interval } } CommonService.req_publish_mqtt_msg(uid, topic_name, msg) # req_success = CommonService.req_publish_mqtt_msg(uid, topic_name, msg) # if not req_success: # return response.json(10044) uid_set_qs = UidSetModel.objects.filter(uid=uid) if uid_set_qs.exists(): uid_set_id = uid_set_qs[0].id uid_set_data['updTime'] = nowTime uid_set_qs.update(**uid_set_data) else: uid_set_data['uid'] = uid uid_set_data['addTime'] = nowTime uid_set_data['updTime'] = nowTime uid_set_qs = UidSetModel.objects.create(**uid_set_data) uid_set_id = uid_set_qs.id # 初始化UidPushModel推送表 if electricity_status: uid_push_create_dict = { 'uid_set_id': uid_set_id, 'userID_id': userID, 'appBundleId': appBundleId, 'app_type': app_type, 'push_type': push_type, 'token_val': token_val, 'm_code': m_code, 'addTime': nowTime, 'updTime': nowTime, 'lang': lang, 'tz': tz } # 绑定设备推送 UidPushModel.objects.create(**uid_push_create_dict) return response.json(0) if status == 0: # 状态为0的时候删除redis缓存数据 self.do_delete_redis(uid) return response.json(0) elif status == 1: uid_push_qs = UidPushModel.objects.filter(userID_id=userID, m_code=m_code, uid_set__uid=uid) if uid_push_qs.exists(): uid_push_update_dict = { 'appBundleId': appBundleId, 'app_type': app_type, 'push_type': push_type, 'token_val': token_val, 'updTime': nowTime, 'lang': lang, 'tz': tz } uid_push_qs.update(**uid_push_update_dict) else: uid_push_create_dict = { 'uid_set_id': uid_set_id, 'userID_id': userID, 'appBundleId': appBundleId, 'app_type': app_type, 'push_type': push_type, 'token_val': token_val, 'm_code': m_code, 'addTime': nowTime, 'updTime': nowTime, 'lang': lang, 'tz': tz } # 绑定设备推送 UidPushModel.objects.create(**uid_push_create_dict) if interval: self.do_delete_redis(uid, interval) else: self.do_delete_redis(uid) etkObj = ETkObject(etk='') etk = etkObj.encrypt(uid) if company_secrete == 'MTEyMTNB': url = DETECT_PUSH_DOMAIN urls = DETECT_PUSH_DOMAINS else: url = DETECT_PUSH_DOMAIN_JIUAN urls = DETECT_PUSH_DOMAINS_JIUAN detectUrl = "{DETECT_PUSH_DOMAIN}notifyV2/push?etk={etk}&company_secrete={company_secrete}®ion={region}". \ format(etk=etk, company_secrete=company_secrete, DETECT_PUSH_DOMAIN=url, region=region) detectUrls = "{DETECT_PUSH_DOMAIN_V2}notifyV2/push?etk={etk}&company_secrete={company_secrete}®ion={region}". \ format(etk=etk, company_secrete=company_secrete, DETECT_PUSH_DOMAIN_V2=urls, region=region) return response.json(0, {'detectUrl': detectUrl, 'detectUrls': detectUrls}) else: return response.json(173) except Exception as e: return response.json(500, repr(e)) def do_delete_redis(self, uid, detect_interval=0): keyPattern = '{uid}*'.format(uid=uid) redisObj = RedisObject(db=6, SERVER_HOST=PUSH_REDIS_ADDRESS) keys = redisObj.get_keys(keyPattern) if keys: for key in keys: key = key.decode() if detect_interval == 0: redisObj.del_data(key=key) elif key.find('plt') != -1: continue elif key.find('flag') != -1: redisObj.set_data(key=key, val=1, expire=detect_interval) else: redisObj.del_data(key=key) def do_query(self, request_dict, response, userID): page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) if not page or not line: return response.json(444, 'page,line') startTime = request_dict.get('startTime', None) endTime = request_dict.get('endTime', None) eventType = request_dict.get('eventType', None) region = request_dict.get('region', None) if not region: return response.json(444, 'region') region = int(region) try: # 根据时间筛选消息推送 if startTime and endTime: qs, count = EquipmentInfoService.find_by_start_time_equipment_info(page, line, userID, startTime, endTime, eventType, request_dict.get('uids', None)) else: # 默认查询近七天消息推送 qs, count = EquipmentInfoService.get_equipment_info_week_all(page, line, userID, startTime, endTime, eventType, request_dict.get('uids', None)) uids = request_dict.get('uids', None) if uids: uid_list = uids.split(',') dvqs = Device_Info.objects.filter(UID__in=uid_list, userID_id=userID).values('UID', 'Type', 'NickName') uid_type_dict = {} for dv in dvqs: uid_type_dict[dv['UID']] = {'type': dv['Type'], 'NickName': dv['NickName']} else: dvqs = Device_Info.objects.filter(userID_id=userID).values('UID', 'Type', 'NickName') uid_type_dict = {} for dv in dvqs: uid_type_dict[dv['UID']] = {'type': dv['Type'], 'NickName': dv['NickName']} if not qs or count == 0 or not qs.exists(): return response.json(0, {'datas': [], 'count': 0}) qr = qs res = [] auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) oss_img_bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg') aws_s3_guonei = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1' ) aws_s3_guowai = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[1], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[1], config=botocore.client.Config(signature_version='s3v4'), region_name='us-east-1' ) # vod_time_list = [] # ai消息标识所有组合标签 ai_all_event_type = EquipmentInfoService.get_all_comb_event_type() for p in qr: devUid = p['devUid'] eventTime = p['eventTime'] channel = p['Channel'] storage_location = p['storage_location'] if p['is_st'] == 1: thumbspng = '{uid}/{channel}/{time}.jpeg'.format(uid=devUid, channel=p['Channel'], time=eventTime) if storage_location == 1: # oss response_url = oss_img_bucket.sign_url('GET', thumbspng, 300) p['img'] = response_url p['img_list'] = [response_url] elif region == 2 and storage_location == 2: # 2:国内,aws response_url = aws_s3_guonei.generate_presigned_url('get_object', Params={'Bucket': 'push', 'Key': thumbspng}, ExpiresIn=300) p['img'] = response_url p['img_list'] = [response_url] elif region == 1 and storage_location == 2: # 1:国外,aws response_url = aws_s3_guowai.generate_presigned_url('get_object', Params={'Bucket': 'foreignpush', 'Key': thumbspng}, ExpiresIn=300) p['img'] = response_url p['img_list'] = [response_url] elif p['is_st'] == 2: # 列表装载回放时间戳标记 vodqs = VodHlsModel.objects.filter(uid=devUid, channel=channel, time=int(eventTime)) \ .values("bucket__bucket", "bucket__endpoint") # print(vodqs) if vodqs.exists(): bucket_name = vodqs[0]['bucket__bucket'] endpoint = vodqs[0]['bucket__endpoint'] bucket = oss2.Bucket(auth, endpoint, bucket_name) ts = '{uid}/vod{channel}/{etime}/ts0.ts'.format(uid=devUid, channel=p['Channel'], etime=eventTime) if storage_location == 1: # oss thumb0 = bucket.sign_url('GET', ts, 3600, params={'x-oss-process': 'video/snapshot,t_0000,w_700'}) thumb1 = bucket.sign_url('GET', ts, 3600, params={'x-oss-process': 'video/snapshot,t_1000,w_700'}) thumb2 = bucket.sign_url('GET', ts, 3600, params={'x-oss-process': 'video/snapshot,t_2000,w_700'}) # thumb3 = bucket.sign_url('GET', ts, 3600, params={'x-oss-process': 'video/snapshot,t_3000,w_700'}) p['img_list'] = [thumb0, thumb1, thumb2] elif region == 2 and storage_location == 2: # 2:国内,aws thumb = aws_s3_guonei.generate_presigned_url('get_object', Params={'Bucket': 'push', 'Key': ts}, ExpiresIn=3600) p['img_list'] = [thumb] elif region == 1 and storage_location == 2: # 1:国外,aws thumb = aws_s3_guowai.generate_presigned_url('get_object', Params={'Bucket': 'foreignpush', 'Key': ts}, ExpiresIn=3600) p['img_list'] = [thumb] elif p['is_st'] == 3 or p['is_st'] == 4: # 列表装载回放时间戳标记 p['img_list'] = [] for i in range(p['is_st']): thumbspng = '{uid}/{channel}/{time}_{st}.jpeg'.format(uid=devUid, channel=p['Channel'], time=eventTime, st=i) if storage_location == 1: # oss img = oss_img_bucket.sign_url('GET', thumbspng, 300) p['img_list'].append(img) elif region == 2 and storage_location == 2: # 2:国内,aws response_url = aws_s3_guonei.generate_presigned_url('get_object', Params={'Bucket': 'push', 'Key': thumbspng}, ExpiresIn=300) img = response_url p['img_list'].append(img) elif region == 1 and storage_location == 2: # 1:国外,aws response_url = aws_s3_guowai.generate_presigned_url('get_object', Params={'Bucket': 'foreignpush', 'Key': thumbspng}, ExpiresIn=300) img = response_url p['img_list'].append(img) if devUid in uid_type_dict.keys(): p['uid_type'] = uid_type_dict[devUid]['type'] p['devNickName'] = uid_type_dict[devUid]['NickName'] else: p['uid_type'] = '' p['borderCoords'] = '' if p['borderCoords'] == '' else json.loads(p['borderCoords']) # ai消息坐标信息 p['ai_event_type_list'] = [] if p['eventType'] in ai_all_event_type: # 如果是ai消息类型,则分解eventType, 如:123 -> [1,2,3] p['ai_event_type_list'] = list(map(int, str(p['eventType']))) p['ai_event_type_list'] += EquipmentInfoService.get_combo_types(p['eventType']) res.append(p) return response.json(0, {'datas': res, 'count': count}) except Exception as e: print(repr(e)) return response.json(474) def do_update_interval(self, userID, request_dict, response): uid = request_dict.get('uid', None) interval = request_dict.get('interval', None) dvqs = Device_Info.objects.filter(userID_id=userID, UID=uid) if dvqs.exists(): uid_set_qs = UidSetModel.objects. \ filter(uid=uid, uidpushmodel__userID_id=userID) # redisObj = RedisObject(db=8) # redisObj.del_data(key='uid_qs_' + userID) if uid_set_qs.exists(): uid_set_qs.update(interval=int(interval)) else: return response.json(173) else: return response.json(0) # 这个接口没有调用过,不敢动 # http://test.dvema.com/detect/add?uidToken=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1aWQiOiJQMldOR0pSRDJFSEE1RVU5MTExQSJ9.xOCI5lerk8JOs5OcAzunrKCfCrtuPIZ3AnkMmnd-bPY&n_time=1526845794&channel=1&event_type=51&is_st=0 # 移动侦测接口 class PushNotificationView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' # operation = kwargs.get('operation') return self.validation(request.GET) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' # operation = kwargs.get('operation') return self.validation(request.POST) def validation(self, request_dict): etk = request_dict.get('etk', None) channel = request_dict.get('channel', '1') n_time = request_dict.get('n_time', None) event_type = request_dict.get('event_type', None) is_st = request_dict.get('is_st', None) region = request_dict.get('region', '2') region = int(region) eto = ETkObject(etk) uid = eto.uid if len(uid) == 20: redisObj = RedisObject(db=6) # pkey = '{uid}_{channel}_ptl'.format(uid=uid, channel=channel) pkey = '{uid}_ptl'.format(uid=uid) ykey = '{uid}_redis_qs'.format(uid=uid) if redisObj.get_data(key=pkey): res_data = {'code': 0, 'msg': 'success,!33333333333'} return JsonResponse(status=200, data=res_data) else: redisObj.set_data(key=pkey, val=1, expire=60) ############## redis_data = redisObj.get_data(key=ykey) if redis_data: redis_list = eval(redis_data) else: # 设置推送时间为60秒一次 redisObj.set_data(key=pkey, val=1, expire=60) print("从数据库查到数据") # 从数据库查询出来 uid_push_qs = UidPushModel.objects.filter(uid_set__uid=uid, uid_set__detect_status=1). \ values('token_val', 'app_type', 'appBundleId', 'push_type', 'userID_id', 'lang', 'm_code', 'tz', 'uid_set__nickname') # 新建一个list接收数据 redis_list = [] # 把数据库数据追加进redis_list for qs in uid_push_qs: redis_list.append(qs) # 修改redis数据,并设置过期时间为10分钟 if redis_list: redisObj.set_data(key=ykey, val=str(redis_list), expire=600) # auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) # bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg') aws_s3_guonei = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1' ) aws_s3_guowai = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[1], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[1], config=botocore.client.Config(signature_version='s3v4'), region_name='us-east-1' ) self.do_bulk_create_info(redis_list, n_time, channel, event_type, is_st, uid) if is_st == '0' or is_st == '2': return JsonResponse(status=200, data={'code': 0, 'msg': 'success44444444444444444'}) elif is_st == '1': # Endpoint以杭州为例,其它Region请按实际情况填写。 # obj = '{uid}/{channel}/{filename}.jpeg'.format(uid=uid, channel=channel, filename=n_time) # 设置此签名URL在60秒内有效。 # url = bucket.sign_url('PUT', obj, 7200) thumbspng = '{uid}/{channel}/{filename}.jpeg'.format(uid=uid, channel=channel, filename=n_time) if region == 2: # 2:国内 response_url = aws_s3_guonei.generate_presigned_url( ClientMethod='put_object', Params={ 'Bucket': 'push', 'Key': thumbspng }, ExpiresIn=3600 ) else: # 1:国外 response_url = aws_s3_guowai.generate_presigned_url( ClientMethod='put_object', Params={ 'Bucket': 'foreignpush', 'Key': thumbspng }, ExpiresIn=3600 ) # res_data = {'code': 0, 'img_push': url, 'msg': 'success'} # response_url = response_url[:4] + response_url[5:] res_data = {'code': 0, 'img_push': response_url, 'msg': 'success'} return JsonResponse(status=200, data=res_data) elif is_st == '3': # 人形检测带动图 img_url_list = [] for i in range(int(is_st)): # obj = '{uid}/{channel}/{filename}_{st}.jpeg'. \ # format(uid=uid, channel=channel, filename=n_time, st=i) # 设置此签名URL在60秒内有效。 # url = bucket.sign_url('PUT', obj, 7200) thumbspng = '{uid}/{channel}/{filename}_{st}.jpeg'. \ format(uid=uid, channel=channel, filename=n_time, st=i) if region == 2: # 2:国内 response_url = aws_s3_guonei.generate_presigned_url( ClientMethod='put_object', Params={ 'Bucket': 'push', 'Key': thumbspng }, ExpiresIn=3600 ) else: # 1:国外 response_url = aws_s3_guowai.generate_presigned_url( ClientMethod='put_object', Params={ 'Bucket': 'foreignpush', 'Key': thumbspng }, ExpiresIn=3600 ) # response_url = response_url[:4] + response_url[5:] img_url_list.append(response_url) # img_url_list.append(url) res_data = {'code': 0, 'img_url_list': img_url_list, 'msg': 'success'} return JsonResponse(status=200, data=res_data) else: return JsonResponse(status=200, data={'code': 404, 'msg': 'data is not exist'}) else: return JsonResponse(status=200, data={'code': 404, 'msg': 'wrong etk'}) def do_bulk_create_info(self, uaqs, n_time, channel, event_type, is_st, uid): now_time = int(time.time()) # 设备昵称 userID_ids = [] sys_msg_list = [] is_sys_msg = self.is_sys_msg(int(event_type)) is_st = int(is_st) eq_list = [] nickname = uaqs[0]['uid_set__nickname'] if not nickname: nickname = uid for ua in uaqs: lang = ua['lang'] tz = ua['tz'] userID_id = ua["userID_id"] if userID_id not in userID_ids: eq_list.append(Equipment_Info( userID_id=userID_id, eventTime=n_time, eventType=event_type, devUid=uid, devNickName=nickname, Channel=channel, alarm='Motion \tChannel:{channel}'.format(channel=channel), is_st=is_st, receiveTime=n_time, addTime=now_time, storage_location=2 )) if is_sys_msg: sys_msg_text = self.get_msg_text(channel=channel, n_time=n_time, lang=lang, tz=tz, event_type=event_type, is_sys=1) sys_msg_list.append(SysMsgModel( userID_id=userID_id, msg=sys_msg_text, addTime=now_time, updTime=now_time, uid=uid, eventType=event_type)) if eq_list: print('eq_list') Equipment_Info.objects.bulk_create(eq_list) if is_sys_msg: print('sys_msg') SysMsgModel.objects.bulk_create(sys_msg_list) return True def is_sys_msg(self, event_type): event_type_list = [702, 703, 704] if event_type in event_type_list: return True return False def get_msg_text(self, channel, n_time, lang, tz, event_type, is_sys=0): n_date = CommonService.get_now_time_str(n_time=n_time, tz=tz) etype = int(event_type) if lang == 'cn': if etype == 704: msg_type = '电量过低' elif etype == 702: msg_type = '摄像头休眠' elif etype == 703: msg_type = '摄像头唤醒' else: msg_type = '' if is_sys: send_text = '{msg_type} 通道:{channel}'.format(msg_type=msg_type, channel=channel) else: send_text = '{msg_type} 通道:{channel} 日期:{date}'.format(msg_type=msg_type, channel=channel, date=n_date) else: if etype == 704: msg_type = 'Low battery' elif etype == 702: msg_type = 'Camera sleep' elif etype == 703: msg_type = 'Camera wake' else: msg_type = '' if is_sys: send_text = '{msg_type} channel:{channel}'. \ format(msg_type=msg_type, channel=channel) else: send_text = '{msg_type} channel:{channel} date:{date}'. \ format(msg_type=msg_type, channel=channel, date=n_date) return send_text