from django.views.generic.base import View from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from Service.TokenManager import JSONTokenManager from Service.ModelService import ModelService from Model.models import Equipment_Stream from Object.AWS.S3ClassObject import S3ClassObject from Object.AWS.ElasticTranscoder import ElasticTranscoder import traceback, xmltodict, requests, re, subprocess from ffmpy import FFmpeg import time from Object.AWS.CloudfrontSignCookie import BetterThanBoto from Ansjer.settings import NGINX_RTMP_STAT,RTMP_PUSH_URL from Service.ResponseService import * from Ansjer.config import BASE_DIR import os ''' 移动端=> 增: http://192.168.136.40:8077/media/stream?token=test&channel=1&status=1&uid=1&operation=add 删: http://192.168.136.40:8077/media/stream?token=test&id=11&id=12&operation=delete 改: http://192.168.136.40:8077/media/stream?token=test&id=13&operation=update&status=0 查: http://192.168.136.40:8077/media/stream?token=test&operation=query 获取视频播放地址 http://13.56.215.252:82/media/stream?token=test&channel=99&uid=2N1K3LE78TYJ38CE111A&filename=1526882855.flv&operation=getVodHls http://192.168.136.40:8077/media/stream?token=test&channel=1&uid=1&operation=getVodUrl&filename=1_1-1523247439.mp4 获取所有保存s3的视频 http://192.168.136.40:8077/media/stream?token=test&channel=1&uid=1&operation=getAllVideo 删除设备视频多个,key值传多个即可 http://192.168.136.40:8077/media/stream?token=test&key=UKPAH63V23U4ZHEB111A_1/UKPAH63V23U4ZHEB111A_1-1524039164.mp4&uid=UKPAH63V23U4ZHEB111A&channel=1&operation=getDelVideo ---------------------------------------------------------------------------------------------------- 后台=> 验证推流权限 http://192.168.136.40:8077/media/auth_stream?userID=151547867345163613800138001&uid=1&channel=1&access_token=test 验证播放权限(直播) http://192.168.136.40:8077/media/auth_live?userID=151547867345163613800138001&uid=1&channel=1&token=test rtmp://192.168.136.45:1935/hls/UKPAH63V23U4ZHEB111A_88?userID=151547867 获取所有推流的设备信息 http://192.168.136.40:8077/media/stream?token=test&page=1&line=10&operation=getAdminAllStream 删除流 http://192.168.136.40:8077/media/stream?token=test&id=1&id=2&id=3&operation=getAdminDelStream 删除指定设备视频 http://192.168.136.40:8077/media/stream?token=test&key=1&key=2&key=3&operation=getAdminDelVideo 更新 http://192.168.136.40:8077/media/stream?token=test&id=13&operation=getAdminUpdateStream&status=0 获取指定设备存储的视频播放地址 http://192.168.136.40:8077/media/stream?token=test&channel=1&uid=1&filename=1_1-1523247439.mp4&operation=getAdminVodUrl hls播放 http://13.56.215.252:82/media/stream?token=test&channel=1&status=1&uid=2N1K3LE78TYJ38CE111A&operation=add&rank=1 ''' class StreamMedia(View): @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(StreamMedia, self).dispatch(*args, **kwargs) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request_dict=request.GET) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request_dict=request.POST) def validation(self, request_dict, *args, **kwargs): token = request_dict.get('token', None) if token is not None: tokenManager = JSONTokenManager() error_code = tokenManager.verify_AToken(token) if error_code == 0: userID = tokenManager.accessDict.get('userID', None) own_permission = ModelService.check_permission(userID=userID, permID=60) if own_permission is not True: return ResponseJSON(404) operation = request_dict.get('operation', None) if operation == 'add': return self.add_stream(token=token, request_dict=request_dict, userID=userID) elif operation == 'delete': return self.delete_stream(request_dict=request_dict, userID=userID) elif operation == 'update': return self.update_stream(userID=userID, request_dict=request_dict) elif operation == 'query': return self.query_stream(userID=userID) elif operation == 'getDelVideo': return self.get_del_video(userID=userID, request_dict=request_dict) elif operation == 'getAllVideo': return self.get_all_video(request_dict=request_dict, userID=userID) elif operation == 'getVodUrl': return self.get_vod_url(request_dict=request_dict) elif operation == 'getVodHls': return self.get_vod_hls(request_dict=request_dict) elif operation == 'getAdminAllStream': return self.get_admin_all_stream(request_dict=request_dict, userID=userID) elif operation == 'getAdminDelStream': return self.get_admin_del_stream(request_dict=request_dict, userID=userID) elif operation == 'getAdminUpdateStream': return self.get_admin_update_stream(request_dict=request_dict, userID=userID) elif operation == 'getAdminVodUrl': return self.get_admin_vod_url(request_dict=request_dict, userID=userID) elif operation == 'getAdminFindStream': return self.get_admin_find_stream(request_dict=request_dict, userID=userID) elif operation == 'getAdminDelVideo': return self.get_admin_del_video(request_dict=request_dict, userID=userID) elif operation == 'getAllServerJson': return self.get_all_server_json(userID=userID) elif operation == 'getAdminHlsVod': return self.get_admin_hls_vod(userID=userID, request_dict=request_dict) elif operation == 'getAdminAddStream': return self.get_admin_add_stream(userID=userID, request_dict=request_dict) else: return ResponseJSON(444) else: return HttpResponse(tokenManager.errorCodeInfo(error_code)) else: return ResponseJSON(311) def add_stream(self, token, userID, request_dict): channel = request_dict.get('channel', None) status = request_dict.get('status', None) uid = request_dict.get('uid', None) flag_param = CommonService.get_param_flag(data=[channel, status, uid]) if flag_param is True: is_user = ModelService.check_own_device(userID=userID, UID=uid) if is_user is True: stream_queryset = Equipment_Stream.objects.filter(userID=userID, uid=uid, channel=channel, rank=1) if stream_queryset.exists(): return ResponseJSON(174) try: flag = Equipment_Stream.objects.create(userID=userID, status=status, uid=uid, channel=channel, access_token=token) except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) return ResponseJSON(424, errorInfo) else: if flag: rtmp_url = RTMP_PUSH_URL rtmp_name = uid + '_' + channel + '?userID=' + userID + '&uid=' + uid + '&channel=' + channel + '&access_token=' + token return ResponseJSON(0, {'userID': userID, 'channel': channel, 'uid': uid, 'status': status, 'access_token': token, 'rtmp_url': rtmp_url, 'rtmp_name': rtmp_name}) else: return ResponseJSON(14) else: return ResponseJSON(444) def delete_stream(self, request_dict, userID): id_list = request_dict.getlist('id', None) if len(id_list): for id in id_list: Equipment_Stream.objects.filter(id=id, userID=userID).delete() return ResponseJSON(0) return ResponseJSON(444) def update_stream(self, userID, request_dict): id = request_dict.get('id', None) status = request_dict.get('status', None) is_update = Equipment_Stream.objects.filter(id=id, userID=userID).update(status=status) if is_update: return ResponseJSON(0,{'id': id, 'status': status}) return ResponseJSON(444) def query_stream(self, userID): equipment_stream_queryset = Equipment_Stream.objects.filter(userID=userID) if equipment_stream_queryset.exists(): send_josn = CommonService.query_set_to_dict(equipment_stream_queryset) return HttpResponse(CommonService.response_formal( data={'code': 0, 'reason': u'Success', 'result': send_josn})) return ResponseJSON(444) def get_all_video(self, request_dict, userID): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid is not None and channel is not None: own_permission = ModelService.check_permission(userID=userID, permID=30) own_device = ModelService.check_own_device(userID=userID, UID=uid) if own_permission is True or own_device is True: s3 = S3ClassObject() data = s3.get_all_object(prefix=uid + '_' + channel + '/flv') # data = s3.get_all_object(prefix=uid + '_' + channel + '/') return ResponseJSON(0, {'files': data}) else: return ResponseJSON(404) return ResponseJSON(444) def get_vod_url(self, request_dict): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) if uid is not None and filename is not None and channel is not None: s3 = S3ClassObject() url = s3.get_generate_vod_url(uid + '_' + channel + '/' + filename) return ResponseJSON(0, {'url': url}) else: return ResponseJSON(444) def get_admin_all_stream(self, request_dict, userID): # 判断 own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) equipment_stream_queryset = Equipment_Stream.objects.all() if equipment_stream_queryset.exists(): equipment_stream_count = equipment_stream_queryset.count() equipment_stream_res = equipment_stream_queryset[(page - 1) * line:page * line] send_json = CommonService.query_set_to_dict(equipment_stream_res) send_json['count'] = equipment_stream_count send_json = CommonService.query_set_to_dict(equipment_stream_queryset) return ResponseJSON(0, send_json) else: return ResponseJSON(0) else: return ResponseJSON(404) def get_admin_del_stream(self, request_dict, userID): # 判断 own_permission = ModelService.check_permission(userID=userID, permID=10) if own_permission is True: id_list = request_dict.getlist('id', None) if len(id_list): for id in id_list: Equipment_Stream.objects.filter(id=id).delete() return ResponseJSON(0) else: return ResponseJSON(444) return ResponseJSON(404) def get_admin_del_video(self, request_dict, userID): # 判断 own_permission = ModelService.check_permission(userID=userID, permID=10) if own_permission is True: key_list = request_dict.getlist('key', None) if len(key_list): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid is not None and channel is not None: re_uid = re.compile(r'^' + uid + '_' + str(channel)) re_flag = False print(key_list) for keys in key_list: re_flag = re_uid.match(keys) if re_flag is not True: break print(re_flag) if re_flag: s3 = S3ClassObject() response = s3.del_object_list(keylist=key_list) if response['Deleted']: return ResponseJSON(0, {'deleted': response['Deleted']}) return ResponseJSON(444) def get_del_video(self, request_dict, userID): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid is not None and channel is not None: own_permission = ModelService.check_own_device(userID=userID, UID=uid) if own_permission is True: key_list = request_dict.getlist('key', None) if len(key_list) > 0 and key_list is not None: re_uid = re.compile(r'^' + uid + '_' + str(channel)) re_flag = False print(key_list) for keys in key_list: re_flag = re_uid.match(keys) if re_flag is not True: break if re_flag: s3 = S3ClassObject() response = s3.del_object_list(keylist=key_list) if response['Deleted']: return ResponseJSON(0) else: return ResponseJSON(404) return ResponseJSON(444) def get_admin_update_stream(self, userID, request_dict): own_permission = ModelService.check_permission(userID=userID, permID=50) if own_permission is True: id = request_dict.get('id', None) status = request_dict.get('status', None) param_flag = CommonService.get_param_flag(data=[status, id]) if param_flag is True: is_update = Equipment_Stream.objects.filter(id=id).update(status=status) if is_update: return ResponseJSON(0, {'id': id, 'status': status}) else: return ResponseJSON(444) else: return ResponseJSON(404) def get_admin_vod_url(self, userID, request_dict): own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) if uid is not None and filename is not None and channel is not None: s3 = S3ClassObject() url = s3.get_generate_vod_url(uid + '_' + channel + '/' + filename) return ResponseJSON(0, {'url': url}) else: return ResponseJSON(444) else: return ResponseJSON(404) def get_admin_find_stream(self, userID, request_dict): own_permission = ModelService.check_permission(userID=userID, permID=20) if own_permission is True: content = request_dict.get('content', None) page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) param_flag = CommonService.get_param_flag(data=[content, page, line]) if param_flag is True: content = json.loads(content) search_kwargs = CommonService.get_kwargs(data=content) equipment_stream_queryset = Equipment_Stream.objects.filter(**search_kwargs) if equipment_stream_queryset.exists(): equipment_stream_count = equipment_stream_queryset.count() equipment_stream_res = equipment_stream_queryset[(page - 1) * line:page * line] send_json = CommonService.query_set_to_dict(equipment_stream_res) send_json['count'] = equipment_stream_count return ResponseJSON(0, send_json) else: return ResponseJSON(0) else: return ResponseJSON(444) else: return ResponseJSON(404) def get_all_server_json(self, userID): own_permission = ModelService.check_permission(userID=userID, permID=20) if own_permission is True: url = NGINX_RTMP_STAT res = requests.get(url) xml_content = res.text xml_dict = xmltodict.parse(xml_content) if len(xml_dict): return ResponseJSON(0, {'data': xml_dict}) else: return ResponseJSON(404) def get_admin_add_stream(self, token, userID, request_dict): channel = request_dict.get('channel', None) status = request_dict.get('status', None) uid = request_dict.get('uid', None) flag_param = CommonService.get_param_flag(data=[channel, status, uid]) if flag_param is True: own_perm = ModelService.check_permission(userID=userID, permID=40) if own_perm is True: stream_queryset = Equipment_Stream.objects.filter(uid=uid, channel=channel) if stream_queryset.exists(): return ResponseJSON(174) try: flag = Equipment_Stream.objects.create(userID=userID, status=status, uid=uid, channel=channel, access_token=token) except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) return ResponseJSON(424, {'details': errorInfo}) else: if flag: rtmp_url = RTMP_PUSH_URL rtmp_name = uid + '_' + channel + '?userID=' + userID + '&uid=' + uid + '&channel=' + channel + '&access_token=' + token return ResponseJSON(0, {'userID': userID, 'channel': channel, 'uid': uid, 'status': status, 'access_token': token, 'rtmp_url': rtmp_url, 'rtmp_name': rtmp_name}) else: return ResponseJSON(404) else: return ResponseJSON(444) def get_admin_hls_vod(self, request_dict, userID): own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) param_flag = CommonService.get_param_flag(data=[uid, filename, channel]) if param_flag is True: uid_channel = uid + '_' + channel InputKey = uid_channel + '/flv/' + filename ts = filename.split('.')[0] vod_key = uid_channel + '/' + ts elastictranscoder = ElasticTranscoder() res = elastictranscoder.create_job( # InputKey='2N1K3LE78TYJ38CE111A_99/flv/1526882855.flv', InputKey=InputKey, # OutputKey='vod/2N1K3LE78TYJ38CE111A_99/1526882855' OutputKey='vod/' + vod_key ) if res is True: domain_prefix = 'http://d3596w5a6euckc.cloudfront.net/vod' vod_url = domain_prefix + '/' + vod_key + '.m3u8' return ResponseJSON(0, {'vodurl': vod_url}) else: return ResponseJSON(403) else: return ResponseJSON(444) else: return ResponseJSON(404) def get_vod_hls(self, request_dict): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) param_flag = CommonService.get_param_flag(data=[uid, filename, channel]) if param_flag is True: uid_channel = uid + '_' + channel InputKey = uid_channel + '/flv/' + filename ts = filename.split('.')[0] vod_key = uid_channel + '/' + ts elastictranscoder = ElasticTranscoder() res = elastictranscoder.create_job( InputKey=InputKey, OutputKey='vod/' + vod_key ) if res is True: domain_prefix = 'http://d3596w5a6euckc.cloudfront.net/vod' vod_url = domain_prefix + '/' + vod_key + '.m3u8' vod_cookie_url = domain_prefix + '/' + uid_channel + '/*' expires_at = int(time.time()) + 7200 cf = BetterThanBoto() vodCookie = cf.create_signed_cookies(url=vod_cookie_url, keypair_id="APKAINI6BNPKV54NHH7Q", expires_at=expires_at, private_key_file=os.path.join(BASE_DIR, 'Ansjer/file/pk-APKAINI6BNPKV54NHH7Q.pem')) return ResponseJSON(0, {'vodurl': vod_url,'vodCookie': vodCookie}) else: return ResponseJSON(403) else: return ResponseJSON(444) # 推流验证 @csrf_exempt def Auth_Stream(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST access_token = request_dict.get('access_token', None) userID = request_dict.get('userID', None) uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) param_flag = CommonService.get_param_flag(data=[access_token, userID, uid, channel]) if param_flag is True: own_permission = ModelService.check_permission(userID=userID, permID=60) if own_permission is True: equipment_stream_queryset = Equipment_Stream.objects.filter(userID=userID, uid=uid, channel=channel, status=1, access_token=access_token) if equipment_stream_queryset.exists(): return HttpResponse(status=200) return HttpResponse(status=404) # 播放验证 @csrf_exempt def Auth_Live(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST token = request_dict.get('token', None) if token is not None: tokenManager = JSONTokenManager() error_code = tokenManager.verify_AToken(token) if error_code == 0: userID = request_dict.get('userID', None) uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if userID is not None and uid is not None and channel is not None: own_permission = ModelService.check_permission(userID=userID, permID=90) if own_permission is True: equipment_stream_queryset = Equipment_Stream.objects.filter(userID=userID, uid=uid, channel=channel, status=1) if equipment_stream_queryset.exists(): return HttpResponse(status=200) return HttpResponse(status=404) @csrf_exempt def send_video_s3(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST # 文件名 basename = request_dict.get('basename', None) print(basename) if basename != None: base_path = '/tmp/flv/' file_base_path = base_path + basename input_path = file_base_path + '.flv' output_path = file_base_path + '.mp4' if os.path.exists(input_path): uid_channel = basename.split('-')[0] uid = uid_channel.split('_')[0] channel = uid_channel.split('_')[1] equipment_stream_queryset = Equipment_Stream.objects.filter(channel=channel, uid=uid, status=1) if equipment_stream_queryset.exists(): try: key = uid_channel + '/' + basename + '.mp4' # 转换到 ff = FFmpeg( inputs={input_path: None}, outputs={output_path: ['-vcodec', 'copy', '-strict', '-2']} ) ff.run() # 截取第一帧当缩略图 # ffmpeg -i 555.flv -y -f image2 -ss 08.010 -t 0.001 -s 352x240 b.jpg except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) else: if os.path.exists(output_path): s3 = S3ClassObject() s3_response = s3.put_mp4_object(body=open(output_path, 'rb'), key=key) print(s3_response) if s3_response['ResponseMetadata']['HTTPStatusCode'] == 200: # 记录保存文件容量,由于此操作过于频繁入库,上线代码去掉 file_size = CommonService.get_file_size(file_path=output_path, suffix_type='MB', decimal_point=2) size = equipment_stream_queryset[0].total_flow if size: pass else: size = 0 equipment_stream_queryset.update(total_flow=size + file_size) # rm_common = 'rm ' + input_path + ' ' + output_path subprocess.Popen(rm_common, shell=True) return HttpResponse(status=200) else: rm_common = 'rm ' + input_path + ' ' + output_path print('respone error') subprocess.Popen(rm_common, shell=True) return HttpResponse(status=403) else: return ResponseJSON(444) @csrf_exempt def push_flv_s3(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST # 文件名 basename = request_dict.get('basename', None) # print(basename) if basename != None: base_path = 'tmp/flv/' file_base_path = base_path + basename flv_path = file_base_path + '.flv' if os.path.exists(flv_path): dict_file = basename.split('-') uid_channel = dict_file[0] file_time = dict_file[1] uid = uid_channel.split('_')[0] channel = uid_channel.split('_')[1] equipment_stream_queryset = Equipment_Stream.objects.filter(channel=channel, uid=uid, status=1) if equipment_stream_queryset.exists(): try: jpg_path = file_base_path + '.jpg' # 截取第一帧当缩略图ffmpeg -i 2N1K3LE78TYJ38CE111A_99-1526882169.flv -y -f image2 -t 0.001 -s 352x240 a.jpg ff = FFmpeg( inputs={flv_path: None}, outputs={jpg_path: ['-y', '-f', 'image2', '-t', '0.001', '-s', '352x240']} ) ff.cmd ff.run() except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) else: if os.path.exists(jpg_path): s3 = S3ClassObject() flv_key = uid_channel + '/flv/' + file_time + '.flv' jpg_key = uid_channel + '/jpg/' + file_time + '.jpg' try: flv_response = s3.put_object(body=open(flv_path, 'rb'), key=flv_key) jpg_response = s3.put_object(body=open(jpg_path, 'rb'), key=jpg_key) if flv_response['ResponseMetadata']['HTTPStatusCode'] == 200 and \ jpg_response['ResponseMetadata']['HTTPStatusCode'] == 200: code = 0 else: code = 444 except Exception as e: print(repr(e)) code = 444 pass path_list = [flv_path, jpg_path] for i in path_list: try: os.remove(i) except Exception as e: print(repr(e)) pass return ResponseJSON(code) return ResponseJSON(444)