from django.views.generic.base import View from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from Service.ModelService import ModelService from Model.models import Equipment_Stream from Object.AWS.S3ClassObject import S3ClassObject from Object.AWS.ElasticTranscoder import ElasticTranscoder import traceback, xmltodict, requests, re, subprocess, time, os,simplejson as json from ffmpy import FFmpeg from Object.AWS.CloudfrontSignCookie import BetterThanBoto from Ansjer.config import NGINX_RTMP_STAT, RTMP_PUSH_URL from Ansjer.settings import BASE_DIR from Object.TokenObject import TokenObject from Object.ResponseObject import ResponseObject from Service.CommonService import CommonService from django.http import HttpResponse ''' 移动端=> 增: http://192.168.136.40:8077/media/stream?token=test&channel=1&status=1&uid=1&operation=add 删: http://192.168.136.40:8077/media/stream?token=test&id=11&id=12&operation=delete 改: http://192.168.136.40:8077/media/stream?token=test&id=13&operation=update&status=0 查: http://192.168.136.40:8077/media/stream?token=test&operation=query 获取视频播放地址 http://13.56.215.252:82/media/stream?token=test&channel=99&uid=2N1K3LE78TYJ38CE111A&filename=1526882855.flv&operation=getVodHls http://192.168.136.40:8077/media/stream?token=test&channel=1&uid=1&operation=getVodUrl&filename=1_1-1523247439.mp4 获取所有保存s3的视频 http://192.168.136.40:8077/media/stream?token=test&channel=1&uid=1&operation=getAllVideo 删除设备视频多个,key值传多个即可 http://192.168.136.40:8077/media/stream?token=test&key=UKPAH63V23U4ZHEB111A_1/UKPAH63V23U4ZHEB111A_1-1524039164.mp4&uid=UKPAH63V23U4ZHEB111A&channel=1&operation=getDelVideo ---------------------------------------------------------------------------------------------------- 后台=> 验证推流权限 http://192.168.136.40:8077/media/auth_stream?userID=151547867345163613800138001&uid=1&channel=1&access_token=test 验证播放权限(直播) http://192.168.136.40:8077/media/auth_live?userID=151547867345163613800138001&uid=1&channel=1&token=test rtmp://192.168.136.45:1935/hls/UKPAH63V23U4ZHEB111A_88?userID=151547867 获取所有推流的设备信息 http://192.168.136.40:8077/media/stream?token=test&page=1&line=10&operation=getAdminAllStream 删除流 http://192.168.136.40:8077/media/stream?token=test&id=1&id=2&id=3&operation=getAdminDelStream 删除指定设备视频 http://192.168.136.40:8077/media/stream?token=test&key=1&key=2&key=3&operation=getAdminDelVideo 更新 http://192.168.136.40:8077/media/stream?token=test&id=13&operation=getAdminUpdateStream&status=0 获取指定设备存储的视频播放地址 http://192.168.136.40:8077/media/stream?token=test&channel=1&uid=1&filename=1_1-1523247439.mp4&operation=getAdminVodUrl hls播放 http://13.56.215.252:82/media/stream?token=test&channel=1&status=1&uid=2N1K3LE78TYJ38CE111A&operation=add&rank=1 ''' class StreamMedia(View): @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(StreamMedia, self).dispatch(*args, **kwargs) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request_dict=request.GET) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request_dict=request.POST) def validation(self, request_dict, *args, **kwargs): response = ResponseObject() token = request_dict.get('token', None) if token is not None: tko = TokenObject(token) tko.valid() response.lang = tko.lang if tko.code == 0: userID = tko.userID() own_permission = ModelService.check_permission(userID=userID, permID=60) if own_permission is not True: operation = request_dict.get('operation', None) if operation == 'add': return self.add_stream(token=token, request_dict=request_dict, userID=userID, response=response) elif operation == 'delete': return self.delete_stream(request_dict=request_dict, userID=userID, response=response) elif operation == 'update': return self.update_stream(userID=userID, request_dict=request_dict, response=response) elif operation == 'query': return self.query_stream(userID=userID, response=response) elif operation == 'getDelVideo': return self.get_del_video(userID=userID, request_dict=request_dict, response=response) elif operation == 'getAllVideo': return self.get_all_video(request_dict=request_dict, userID=userID, response=response) elif operation == 'getVodUrl': return self.get_vod_url(request_dict=request_dict, response=response) elif operation == 'getVodHls': return self.get_vod_hls(request_dict=request_dict, response=response) elif operation == 'getAdminAllStream': return self.get_admin_all_stream(request_dict=request_dict, userID=userID, response=response) elif operation == 'getAdminDelStream': return self.get_admin_del_stream(request_dict=request_dict, userID=userID, response=response) elif operation == 'getAdminUpdateStream': return self.get_admin_update_stream(request_dict=request_dict, userID=userID, response=response) elif operation == 'getAdminVodUrl': return self.get_admin_vod_url(request_dict=request_dict, userID=userID, response=response) elif operation == 'getAdminFindStream': return self.get_admin_find_stream(request_dict=request_dict, userID=userID, response=response) elif operation == 'getAdminDelVideo': return self.get_admin_del_video(request_dict=request_dict, userID=userID, response=response) elif operation == 'getAllServerJson': return self.get_all_server_json(userID=userID, response=response) elif operation == 'getAdminHlsVod': return self.get_admin_hls_vod(userID=userID, request_dict=request_dict, response=response) elif operation == 'getAdminAddStream': return self.get_admin_add_stream(userID=userID, request_dict=request_dict, response=response) else: return response.json(444, 'operation') else: return response.json(404) else: return response.json(tko.code) else: return response.json(311) def add_stream(self, token, userID, request_dict, response): channel = request_dict.get('channel', None) status = request_dict.get('status', None) uid = request_dict.get('uid', None) flag_param = CommonService.get_param_flag(data=[channel, status, uid]) if flag_param is True: is_user = ModelService.check_own_device(userID=userID, UID=uid) if is_user is True: stream_queryset = Equipment_Stream.objects.filter(userID=userID, uid=uid, channel=channel, rank=1) if stream_queryset.exists(): return response.json(174) try: flag = Equipment_Stream.objects.create(userID=userID, status=status, uid=uid, channel=channel, access_token=token) except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) return response.json(424, errorInfo) else: if flag: rtmp_url = RTMP_PUSH_URL rtmp_name = uid + '_' + channel + '?userID=' + userID + '&uid=' + uid + '&channel=' + channel + '&access_token=' + token return response.json(0, {'userID': userID, 'channel': channel, 'uid': uid, 'status': status, 'access_token': token, 'rtmp_url': rtmp_url, 'rtmp_name': rtmp_name}) else: return response.json(14) else: return response.json(444, 'channel, status, uid') def delete_stream(self, request_dict, userID,response): id_list = request_dict.getlist('id', None) if len(id_list): for id in id_list: Equipment_Stream.objects.filter(id=id, userID=userID).delete() return response.json(0) return response.json(444) def update_stream(self, userID, request_dict,response): id = request_dict.get('id', None) status = request_dict.get('status', None) is_update = Equipment_Stream.objects.filter(id=id, userID=userID).update(status=status) if is_update: return response.json(0, {'id': id, 'status': status}) return response.json(444) def query_stream(self, userID,response): equipment_stream_queryset = Equipment_Stream.objects.filter(userID=userID) if equipment_stream_queryset.exists(): res = CommonService.qs_to_dict(equipment_stream_queryset) return response.json(0,res) else: return response.json(0) def get_all_video(self, request_dict, userID,response): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid is not None and channel is not None: own_permission = ModelService.check_permission(userID=userID, permID=30) own_device = ModelService.check_own_device(userID=userID, UID=uid) if own_permission is True or own_device is True: s3 = S3ClassObject() data = s3.get_all_object(prefix=uid + '_' + channel + '/flv') return response.json(0, {'files': data}) else: return response.json(404) return response.json(444,'uid,channel') def get_vod_url(self, request_dict,response): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) if uid is not None and filename is not None and channel is not None: s3 = S3ClassObject() url = s3.get_generate_vod_url(uid + '_' + channel + '/' + filename) return response.json(0, {'url': url}) else: return response.json(444) def get_admin_all_stream(self, request_dict, userID,response): # 判断 own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) equipment_stream_queryset = Equipment_Stream.objects.all() if equipment_stream_queryset.exists(): equipment_stream_count = equipment_stream_queryset.count() equipment_stream_res = equipment_stream_queryset[(page - 1) * line:page * line] send_json = CommonService.qs_to_dict(equipment_stream_res) send_json['count'] = equipment_stream_count send_json = CommonService.qs_to_dict(equipment_stream_queryset) return response.json(0, send_json) else: return response.json(0) else: return response.json(404) def get_admin_del_stream(self, request_dict, userID,response): # 判断 own_permission = ModelService.check_permission(userID=userID, permID=10) if own_permission is True: id_list = request_dict.getlist('id', None) if len(id_list): for id in id_list: Equipment_Stream.objects.filter(id=id).delete() return response.json(0) else: return response.json(444) return response.json(404) def get_admin_del_video(self, request_dict, userID,response): # 判断 own_permission = ModelService.check_permission(userID=userID, permID=10) if own_permission is True: key_list = request_dict.getlist('key', None) if len(key_list): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid is not None and channel is not None: re_uid = re.compile(r'^' + uid + '_' + str(channel)) re_flag = False print(key_list) for keys in key_list: re_flag = re_uid.match(keys) if re_flag is not True: break print(re_flag) if re_flag: s3 = S3ClassObject() response = s3.del_object_list(keylist=key_list) if response['Deleted']: return response.json(0, {'deleted': response['Deleted']}) return response.json(444) def get_del_video(self, request_dict, userID,response): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid is not None and channel is not None: own_permission = ModelService.check_own_device(userID=userID, UID=uid) if own_permission is True: key_list = request_dict.getlist('key', None) if len(key_list) > 0 and key_list is not None: re_uid = re.compile(r'^' + uid + '_' + str(channel)) re_flag = False print(key_list) for keys in key_list: re_flag = re_uid.match(keys) if re_flag is not True: break if re_flag: s3 = S3ClassObject() response = s3.del_object_list(keylist=key_list) if response['Deleted']: return response.json(0) else: return response.json(404) return response.json(444) def get_admin_update_stream(self, userID, request_dict,response): own_permission = ModelService.check_permission(userID=userID, permID=50) if own_permission is True: id = request_dict.get('id', None) status = request_dict.get('status', None) param_flag = CommonService.get_param_flag(data=[status, id]) if param_flag is True: is_update = Equipment_Stream.objects.filter(id=id).update(status=status) if is_update: return response.json(0, {'id': id, 'status': status}) else: return response.json(444) else: return response.json(404) def get_admin_vod_url(self, userID, request_dict,response): own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) if uid is not None and filename is not None and channel is not None: s3 = S3ClassObject() url = s3.get_generate_vod_url(uid + '_' + channel + '/' + filename) return response.json(0, {'url': url}) else: return response.json(444) else: return response.json(404) def get_admin_find_stream(self, userID, request_dict,response): own_permission = ModelService.check_permission(userID=userID, permID=20) if own_permission is True: content = request_dict.get('content', None) page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) param_flag = CommonService.get_param_flag(data=[content, page, line]) if param_flag is True: content = json.loads(content) search_kwargs = CommonService.get_kwargs(data=content) equipment_stream_queryset = Equipment_Stream.objects.filter(**search_kwargs) if equipment_stream_queryset.exists(): equipment_stream_count = equipment_stream_queryset.count() equipment_stream_res = equipment_stream_queryset[(page - 1) * line:page * line] send_json = CommonService.qs_to_dict(equipment_stream_res) send_json['count'] = equipment_stream_count return response.json(0, send_json) else: return response.json(0) else: return response.json(444) else: return response.json(404) def get_all_server_json(self, userID,response): own_permission = ModelService.check_permission(userID=userID, permID=20) if own_permission is True: url = NGINX_RTMP_STAT res = requests.get(url) xml_content = res.text xml_dict = xmltodict.parse(xml_content) if len(xml_dict): return response.json(0, {'data': xml_dict}) else: return response.json(404) def get_admin_add_stream(self, token, userID, request_dict,response): channel = request_dict.get('channel', None) status = request_dict.get('status', None) uid = request_dict.get('uid', None) flag_param = CommonService.get_param_flag(data=[channel, status, uid]) if flag_param is True: own_perm = ModelService.check_permission(userID=userID, permID=40) if own_perm is True: stream_queryset = Equipment_Stream.objects.filter(uid=uid, channel=channel) if stream_queryset.exists(): return response.json(174) try: flag = Equipment_Stream.objects.create(userID=userID, status=status, uid=uid, channel=channel, access_token=token) except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) return response.json(424, {'details': errorInfo}) else: if flag: rtmp_url = RTMP_PUSH_URL rtmp_name = uid + '_' + channel + '?userID=' + userID + '&uid=' + uid + '&channel=' + channel + '&access_token=' + token return response.json(0, {'userID': userID, 'channel': channel, 'uid': uid, 'status': status, 'access_token': token, 'rtmp_url': rtmp_url, 'rtmp_name': rtmp_name}) else: return response.json(404) else: return response.json(444) def get_admin_hls_vod(self, request_dict, userID,response): own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) param_flag = CommonService.get_param_flag(data=[uid, filename, channel]) if param_flag is True: uid_channel = uid + '_' + channel InputKey = uid_channel + '/flv/' + filename ts = filename.split('.')[0] vod_key = uid_channel + '/' + ts elastictranscoder = ElasticTranscoder() res = elastictranscoder.create_job( # InputKey='2N1K3LE78TYJ38CE111A_99/flv/1526882855.flv', InputKey=InputKey, # OutputKey='vod/2N1K3LE78TYJ38CE111A_99/1526882855' OutputKey='vod/' + vod_key ) if res is True: domain_prefix = 'http://d3596w5a6euckc.cloudfront.net/vod' vod_url = domain_prefix + '/' + vod_key + '.m3u8' return response.json(0, {'vodurl': vod_url}) else: return response.json(403) else: return response.json(444) else: return response.json(404) def get_vod_hls(self, request_dict,response): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) param_flag = CommonService.get_param_flag(data=[uid, filename, channel]) if param_flag is True: uid_channel = uid + '_' + channel InputKey = uid_channel + '/flv/' + filename ts = filename.split('.')[0] vod_key = uid_channel + '/' + ts elastictranscoder = ElasticTranscoder() res = elastictranscoder.create_job( InputKey=InputKey, OutputKey='vod/' + vod_key ) if res is True: domain_prefix = 'http://d3596w5a6euckc.cloudfront.net/vod' vod_url = domain_prefix + '/' + vod_key + '.m3u8' vod_cookie_url = domain_prefix + '/' + uid_channel + '/*' expires_at = int(time.time()) + 7200 cf = BetterThanBoto() vodCookie = cf.create_signed_cookies(url=vod_cookie_url, keypair_id="APKAINI6BNPKV54NHH7Q", expires_at=expires_at, private_key_file=os.path.join(BASE_DIR, 'Ansjer/file/pk-APKAINI6BNPKV54NHH7Q.pem')) return response.json(0, {'vodurl': vod_url, 'vodCookie': vodCookie}) else: return response.json(403) else: return response.json(444) # 推流验证 @csrf_exempt def Auth_Stream(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST access_token = request_dict.get('access_token', None) userID = request_dict.get('userID', None) uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) param_flag = CommonService.get_param_flag(data=[access_token, userID, uid, channel]) if param_flag is True: own_permission = ModelService.check_permission(userID=userID, permID=60) if own_permission is True: equipment_stream_queryset = Equipment_Stream.objects.filter(userID=userID, uid=uid, channel=channel, status=1, access_token=access_token) if equipment_stream_queryset.exists(): return HttpResponse(status=200) return HttpResponse(status=404) # 播放验证 @csrf_exempt def Auth_Live(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST response = ResponseObject() token = request_dict.get('token', None) if token is not None: tko = TokenObject(token) tko.valid() response.lang = tko.lang if tko.code == 0: userID = tko.userID() uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if userID is not None and uid is not None and channel is not None: own_permission = ModelService.check_permission(userID=userID, permID=90) if own_permission is True: equipment_stream_queryset = Equipment_Stream.objects.filter(userID=userID, uid=uid, channel=channel, status=1) if equipment_stream_queryset.exists(): return HttpResponse(status=200) return HttpResponse(status=404) @csrf_exempt def send_video_s3(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST # 文件名 basename = request_dict.get('basename', None) print(basename) if basename != None: base_path = '/tmp/flv/' file_base_path = base_path + basename input_path = file_base_path + '.flv' output_path = file_base_path + '.mp4' if os.path.exists(input_path): uid_channel = basename.split('-')[0] uid = uid_channel.split('_')[0] channel = uid_channel.split('_')[1] equipment_stream_queryset = Equipment_Stream.objects.filter(channel=channel, uid=uid, status=1) if equipment_stream_queryset.exists(): try: key = uid_channel + '/' + basename + '.mp4' # 转换到 ff = FFmpeg( inputs={input_path: None}, outputs={output_path: ['-vcodec', 'copy', '-strict', '-2']} ) ff.run() # 截取第一帧当缩略图 # ffmpeg -i 555.flv -y -f image2 -ss 08.010 -t 0.001 -s 352x240 b.jpg except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) else: if os.path.exists(output_path): s3 = S3ClassObject() s3_response = s3.put_mp4_object(body=open(output_path, 'rb'), key=key) print(s3_response) if s3_response['ResponseMetadata']['HTTPStatusCode'] == 200: # 记录保存文件容量,由于此操作过于频繁入库,上线代码去掉 file_size = CommonService.get_file_size(file_path=output_path, suffix_type='MB', decimal_point=2) size = equipment_stream_queryset[0].total_flow if size: pass else: size = 0 equipment_stream_queryset.update(total_flow=size + file_size) # rm_common = 'rm ' + input_path + ' ' + output_path subprocess.Popen(rm_common, shell=True) return HttpResponse(status=200) else: rm_common = 'rm ' + input_path + ' ' + output_path print('respone error') subprocess.Popen(rm_common, shell=True) return HttpResponse(status=403) else: return HttpResponse(404) @csrf_exempt def push_flv_s3(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST # 文件名 basename = request_dict.get('basename', None) response = ResponseObject() if basename != None: base_path = 'tmp/flv/' file_base_path = base_path + basename flv_path = file_base_path + '.flv' if os.path.exists(flv_path): dict_file = basename.split('-') uid_channel = dict_file[0] file_time = dict_file[1] uid = uid_channel.split('_')[0] channel = uid_channel.split('_')[1] equipment_stream_queryset = Equipment_Stream.objects.filter(channel=channel, uid=uid, status=1) if equipment_stream_queryset.exists(): try: jpg_path = file_base_path + '.jpg' # 截取第一帧当缩略图ffmpeg -i 2N1K3LE78TYJ38CE111A_99-1526882169.flv -y -f image2 -t 0.001 -s 352x240 a.jpg ff = FFmpeg( inputs={flv_path: None}, outputs={jpg_path: ['-y', '-f', 'image2', '-t', '0.001', '-s', '352x240']} ) ff.cmd ff.run() except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) else: if os.path.exists(jpg_path): s3 = S3ClassObject() flv_key = uid_channel + '/flv/' + file_time + '.flv' jpg_key = uid_channel + '/jpg/' + file_time + '.jpg' try: flv_response = s3.put_object(body=open(flv_path, 'rb'), key=flv_key) jpg_response = s3.put_object(body=open(jpg_path, 'rb'), key=jpg_key) if flv_response['ResponseMetadata']['HTTPStatusCode'] == 200 and \ jpg_response['ResponseMetadata']['HTTPStatusCode'] == 200: code = 0 else: code = 444 except Exception as e: print(repr(e)) code = 444 pass path_list = [flv_path, jpg_path] for i in path_list: try: os.remove(i) except Exception as e: print(repr(e)) pass return response.json(code) return response.json(444)