from django.views.generic.base import View from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from Service.TokenManager import JSONTokenManager from Service.ModelService import ModelService from Model.models import Equipment_Stream from Service.CommonService import CommonService from Object.AWS.S3ClassObject import S3ClassObject from Object.AWS.ElasticTranscoder import ElasticTranscoder import traceback, xmltodict, requests, re, subprocess from ffmpy import FFmpeg import time from Object.AWS.CloudfrontSignCookie import BetterThanBoto from Ansjer.config import * from Service.ResponseService import * ''' 移动端=> 增: http://192.168.136.40:8077/media/stream?token=test&channel=1&status=1&uid=1&operation=add 删: http://192.168.136.40:8077/media/stream?token=test&id=11&id=12&operation=delete 改: http://192.168.136.40:8077/media/stream?token=test&id=13&operation=update&status=0 查: http://192.168.136.40:8077/media/stream?token=test&operation=query 获取视频播放地址 http://13.56.215.252:82/media/stream?token=test&channel=99&uid=2N1K3LE78TYJ38CE111A&filename=1526882855.flv&operation=getVodHls http://192.168.136.40:8077/media/stream?token=test&channel=1&uid=1&operation=getVodUrl&filename=1_1-1523247439.mp4 获取所有保存s3的视频 http://192.168.136.40:8077/media/stream?token=test&channel=1&uid=1&operation=getAllVideo 删除设备视频多个,key值传多个即可 http://192.168.136.40:8077/media/stream?token=test&key=UKPAH63V23U4ZHEB111A_1/UKPAH63V23U4ZHEB111A_1-1524039164.mp4&uid=UKPAH63V23U4ZHEB111A&channel=1&operation=getDelVideo ---------------------------------------------------------------------------------------------------- 后台=> 验证推流权限 http://192.168.136.40:8077/media/auth_stream?userID=151547867345163613800138001&uid=1&channel=1&access_token=test 验证播放权限(直播) http://192.168.136.40:8077/media/auth_live?userID=151547867345163613800138001&uid=1&channel=1&token=test rtmp://192.168.136.45:1935/hls/UKPAH63V23U4ZHEB111A_88?userID=151547867 获取所有推流的设备信息 http://192.168.136.40:8077/media/stream?token=test&page=1&line=10&operation=getAdminAllStream 删除流 http://192.168.136.40:8077/media/stream?token=test&id=1&id=2&id=3&operation=getAdminDelStream 删除指定设备视频 http://192.168.136.40:8077/media/stream?token=test&key=1&key=2&key=3&operation=getAdminDelVideo 更新 http://192.168.136.40:8077/media/stream?token=test&id=13&operation=getAdminUpdateStream&status=0 获取指定设备存储的视频播放地址 http://192.168.136.40:8077/media/stream?token=test&channel=1&uid=1&filename=1_1-1523247439.mp4&operation=getAdminVodUrl hls播放 http://13.56.215.252:82/media/stream?token=test&channel=1&status=1&uid=2N1K3LE78TYJ38CE111A&operation=add&rank=1 ''' class StreamMedia(View): @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(StreamMedia, self).dispatch(*args, **kwargs) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request_dict=request.GET) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request_dict=request.POST) def validation(self, request_dict, *args, **kwargs): token = request_dict.get('token', None) if token is not None: tokenManager = JSONTokenManager() error_code = tokenManager.verify_AToken(token) if error_code == 0: userID = tokenManager.accessDict.get('userID', None) own_permission = ModelService.check_permission(userID=userID, permID=60) if own_permission is not True: return ResponseJSON(404) operation = request_dict.get('operation', None) if operation == 'add': return self.add_stream(token=token, request_dict=request_dict, userID=userID) elif operation == 'delete': return self.delete_stream(request_dict=request_dict, userID=userID) elif operation == 'update': return self.update_stream(userID=userID, request_dict=request_dict) elif operation == 'query': return self.query_stream(userID=userID) elif operation == 'getDelVideo': return self.get_del_video(userID=userID, request_dict=request_dict) elif operation == 'getAllVideo': return self.get_all_video(request_dict=request_dict, userID=userID) elif operation == 'getVodUrl': return self.get_vod_url(request_dict=request_dict) elif operation == 'getVodHls': return self.get_vod_hls(request_dict=request_dict) elif operation == 'getAdminAllStream': return self.get_admin_all_stream(request_dict=request_dict, userID=userID) elif operation == 'getAdminDelStream': return self.get_admin_del_stream(request_dict=request_dict, userID=userID) elif operation == 'getAdminUpdateStream': return self.get_admin_update_stream(request_dict=request_dict, userID=userID) elif operation == 'getAdminVodUrl': return self.get_admin_vod_url(request_dict=request_dict, userID=userID) elif operation == 'getAdminFindStream': return self.get_admin_find_stream(request_dict=request_dict, userID=userID) elif operation == 'getAdminDelVideo': return self.get_admin_del_video(request_dict=request_dict, userID=userID) elif operation == 'getAllServerJson': return self.get_all_server_json(userID=userID) elif operation == 'getAdminHlsVod': return self.get_admin_hls_vod(userID=userID, request_dict=request_dict) elif operation == 'getAdminAddStream': return self.get_admin_add_stream(userID=userID, request_dict=request_dict) else: return ResponseJSON(444) else: return HttpResponse(tokenManager.errorCodeInfo(error_code)) else: return ResponseJSON(311) def add_stream(self, token, userID, request_dict): channel = request_dict.get('channel', None) status = request_dict.get('status', None) uid = request_dict.get('uid', None) flag_param = CommonService.get_param_flag(data=[channel, status, uid]) if flag_param is True: is_user = ModelService.check_own_device(userID=userID, UID=uid) if is_user is True: stream_queryset = Equipment_Stream.objects.filter(userID=userID, uid=uid, channel=channel, rank=1) if stream_queryset.exists(): return ResponseJSON(174) try: flag = Equipment_Stream.objects.create(userID=userID, status=status, uid=uid, channel=channel, access_token=token) except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) return ResponseJSON(424, errorInfo) else: if flag: rtmp_url = RTMP_PUSH_URL rtmp_name = uid + '_' + channel + '?userID=' + userID + '&uid=' + uid + '&channel=' + channel + '&access_token=' + token return ResponseJSON(0, {'userID': userID, 'channel': channel, 'uid': uid, 'status': status, 'access_token': token, 'rtmp_url': rtmp_url, 'rtmp_name': rtmp_name}) else: return ResponseJSON(14) else: return ResponseJSON(444) def delete_stream(self, request_dict, userID): id_list = request_dict.getlist('id', None) if len(id_list): for id in id_list: Equipment_Stream.objects.filter(id=id, userID=userID).delete() return ResponseJSON(0) return ResponseJSON(444) def update_stream(self, userID, request_dict): id = request_dict.get('id', None) status = request_dict.get('status', None) is_update = Equipment_Stream.objects.filter(id=id, userID=userID).update(status=status) if is_update: return ResponseJSON(0,{'id': id, 'status': status}) return ResponseJSON(444) def query_stream(self, userID): equipment_stream_queryset = Equipment_Stream.objects.filter(userID=userID) if equipment_stream_queryset.exists(): send_josn = CommonService.query_set_to_dict(equipment_stream_queryset) return HttpResponse(CommonService.response_formal( data={'code': 0, 'reason': u'Success', 'result': send_josn})) return ResponseJSON(444) def get_all_video(self, request_dict, userID): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid is not None and channel is not None: own_permission = ModelService.check_permission(userID=userID, permID=30) own_device = ModelService.check_own_device(userID=userID, UID=uid) if own_permission is True or own_device is True: s3 = S3ClassObject() data = s3.get_all_object(prefix=uid + '_' + channel + '/flv') # data = s3.get_all_object(prefix=uid + '_' + channel + '/') return ResponseJSON(0, {'files': data}) else: return ResponseJSON(404) return ResponseJSON(444) def get_vod_url(self, request_dict): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) if uid is not None and filename is not None and channel is not None: s3 = S3ClassObject() url = s3.get_generate_vod_url(uid + '_' + channel + '/' + filename) return ResponseJSON(0, {'url': url}) else: return ResponseJSON(444) def get_admin_all_stream(self, request_dict, userID): # 判断 own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) equipment_stream_queryset = Equipment_Stream.objects.all() if equipment_stream_queryset.exists(): equipment_stream_count = equipment_stream_queryset.count() equipment_stream_res = equipment_stream_queryset[(page - 1) * line:page * line] send_json = CommonService.query_set_to_dict(equipment_stream_res) send_json['count'] = equipment_stream_count send_json = CommonService.query_set_to_dict(equipment_stream_queryset) return ResponseJSON(0, send_json) else: return ResponseJSON(0) else: return ResponseJSON(404) def get_admin_del_stream(self, request_dict, userID): # 判断 own_permission = ModelService.check_permission(userID=userID, permID=10) if own_permission is True: id_list = request_dict.getlist('id', None) if len(id_list): for id in id_list: Equipment_Stream.objects.filter(id=id).delete() return ResponseJSON(0) else: return ResponseJSON(444) return ResponseJSON(404) def get_admin_del_video(self, request_dict, userID): # 判断 own_permission = ModelService.check_permission(userID=userID, permID=10) if own_permission is True: key_list = request_dict.getlist('key', None) if len(key_list): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid is not None and channel is not None: re_uid = re.compile(r'^' + uid + '_' + str(channel)) re_flag = False print(key_list) for keys in key_list: re_flag = re_uid.match(keys) if re_flag is not True: break print(re_flag) if re_flag: s3 = S3ClassObject() response = s3.del_object_list(keylist=key_list) if response['Deleted']: return ResponseJSON(0, {'deleted': response['Deleted']}) return ResponseJSON(444) def get_del_video(self, request_dict, userID): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if uid is not None and channel is not None: own_permission = ModelService.check_own_device(userID=userID, UID=uid) if own_permission is True: key_list = request_dict.getlist('key', None) if len(key_list) > 0 and key_list is not None: re_uid = re.compile(r'^' + uid + '_' + str(channel)) re_flag = False print(key_list) for keys in key_list: re_flag = re_uid.match(keys) if re_flag is not True: break if re_flag: s3 = S3ClassObject() response = s3.del_object_list(keylist=key_list) if response['Deleted']: return ResponseJSON(0) else: return ResponseJSON(404) return ResponseJSON(444) def get_admin_update_stream(self, userID, request_dict): own_permission = ModelService.check_permission(userID=userID, permID=50) if own_permission is True: id = request_dict.get('id', None) status = request_dict.get('status', None) param_flag = CommonService.get_param_flag(data=[status, id]) if param_flag is True: is_update = Equipment_Stream.objects.filter(id=id).update(status=status) if is_update: return ResponseJSON(0, {'id': id, 'status': status}) else: return ResponseJSON(444) else: return ResponseJSON(404) def get_admin_vod_url(self, userID, request_dict): own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) if uid is not None and filename is not None and channel is not None: s3 = S3ClassObject() url = s3.get_generate_vod_url(uid + '_' + channel + '/' + filename) return ResponseJSON(0, {'url': url}) else: return ResponseJSON(444) else: return ResponseJSON(404) def get_admin_find_stream(self, userID, request_dict): own_permission = ModelService.check_permission(userID=userID, permID=20) if own_permission is True: content = request_dict.get('content', None) page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) param_flag = CommonService.get_param_flag(data=[content, page, line]) if param_flag is True: content = json.loads(content) search_kwargs = CommonService.get_kwargs(data=content) equipment_stream_queryset = Equipment_Stream.objects.filter(**search_kwargs) if equipment_stream_queryset.exists(): equipment_stream_count = equipment_stream_queryset.count() equipment_stream_res = equipment_stream_queryset[(page - 1) * line:page * line] send_json = CommonService.query_set_to_dict(equipment_stream_res) send_json['count'] = equipment_stream_count return ResponseJSON(0, send_json) else: return ResponseJSON(0) else: return ResponseJSON(444) else: return ResponseJSON(404) def get_all_server_json(self, userID): own_permission = ModelService.check_permission(userID=userID, permID=20) if own_permission is True: url = NGINX_RTMP_STAT res = requests.get(url) xml_content = res.text xml_dict = xmltodict.parse(xml_content) if len(xml_dict): return ResponseJSON(0, {'data': xml_dict}) else: return ResponseJSON(404) def get_admin_add_stream(self, token, userID, request_dict): channel = request_dict.get('channel', None) status = request_dict.get('status', None) uid = request_dict.get('uid', None) flag_param = CommonService.get_param_flag(data=[channel, status, uid]) if flag_param is True: own_perm = ModelService.check_permission(userID=userID, permID=40) if own_perm is True: stream_queryset = Equipment_Stream.objects.filter(uid=uid, channel=channel) if stream_queryset.exists(): return ResponseJSON(174) try: flag = Equipment_Stream.objects.create(userID=userID, status=status, uid=uid, channel=channel, access_token=token) except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) return ResponseJSON(424, {'details': errorInfo}) else: if flag: rtmp_url = RTMP_PUSH_URL rtmp_name = uid + '_' + channel + '?userID=' + userID + '&uid=' + uid + '&channel=' + channel + '&access_token=' + token return ResponseJSON(0, {'userID': userID, 'channel': channel, 'uid': uid, 'status': status, 'access_token': token, 'rtmp_url': rtmp_url, 'rtmp_name': rtmp_name}) else: return ResponseJSON(404) else: return ResponseJSON(444) def get_admin_hls_vod(self, request_dict, userID): own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) param_flag = CommonService.get_param_flag(data=[uid, filename, channel]) if param_flag is True: uid_channel = uid + '_' + channel InputKey = uid_channel + '/flv/' + filename ts = filename.split('.')[0] vod_key = uid_channel + '/' + ts elastictranscoder = ElasticTranscoder() res = elastictranscoder.create_job( # InputKey='2N1K3LE78TYJ38CE111A_99/flv/1526882855.flv', InputKey=InputKey, # OutputKey='vod/2N1K3LE78TYJ38CE111A_99/1526882855' OutputKey='vod/' + vod_key ) if res is True: domain_prefix = 'http://d3596w5a6euckc.cloudfront.net/vod' vod_url = domain_prefix + '/' + vod_key + '.m3u8' return ResponseJSON(0, {'vodurl': vod_url}) else: return ResponseJSON(403) else: return ResponseJSON(444) else: return ResponseJSON(404) def get_vod_hls(self, request_dict): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) filename = request_dict.get('filename', None) param_flag = CommonService.get_param_flag(data=[uid, filename, channel]) if param_flag is True: uid_channel = uid + '_' + channel InputKey = uid_channel + '/flv/' + filename ts = filename.split('.')[0] vod_key = uid_channel + '/' + ts elastictranscoder = ElasticTranscoder() res = elastictranscoder.create_job( InputKey=InputKey, OutputKey='vod/' + vod_key ) if res is True: domain_prefix = 'http://d3596w5a6euckc.cloudfront.net/vod' vod_url = domain_prefix + '/' + vod_key + '.m3u8' vod_cookie_url = domain_prefix + '/' + uid_channel + '/*' expires_at = int(time.time()) + 7200 cf = BetterThanBoto() vodCookie = cf.create_signed_cookies(url=vod_cookie_url, keypair_id="APKAINI6BNPKV54NHH7Q", expires_at=expires_at, private_key_file=os.path.join(BASE_DIR, 'Ansjer/file/pk-APKAINI6BNPKV54NHH7Q.pem')) return ResponseJSON(0, {'vodurl': vod_url,'vodCookie': vodCookie}) else: return ResponseJSON(403) else: return ResponseJSON(444) # 推流验证 @csrf_exempt def Auth_Stream(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST access_token = request_dict.get('access_token', None) userID = request_dict.get('userID', None) uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) param_flag = CommonService.get_param_flag(data=[access_token, userID, uid, channel]) if param_flag is True: own_permission = ModelService.check_permission(userID=userID, permID=60) if own_permission is True: equipment_stream_queryset = Equipment_Stream.objects.filter(userID=userID, uid=uid, channel=channel, status=1, access_token=access_token) if equipment_stream_queryset.exists(): return HttpResponse(status=200) return HttpResponse(status=404) # 播放验证 @csrf_exempt def Auth_Live(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST token = request_dict.get('token', None) if token is not None: tokenManager = JSONTokenManager() error_code = tokenManager.verify_AToken(token) if error_code == 0: userID = request_dict.get('userID', None) uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) if userID is not None and uid is not None and channel is not None: own_permission = ModelService.check_permission(userID=userID, permID=90) if own_permission is True: equipment_stream_queryset = Equipment_Stream.objects.filter(userID=userID, uid=uid, channel=channel, status=1) if equipment_stream_queryset.exists(): return HttpResponse(status=200) return HttpResponse(status=404) @csrf_exempt def send_video_s3(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST # 文件名 basename = request_dict.get('basename', None) print(basename) if basename != None: base_path = '/tmp/flv/' file_base_path = base_path + basename input_path = file_base_path + '.flv' output_path = file_base_path + '.mp4' if os.path.exists(input_path): uid_channel = basename.split('-')[0] uid = uid_channel.split('_')[0] channel = uid_channel.split('_')[1] equipment_stream_queryset = Equipment_Stream.objects.filter(channel=channel, uid=uid, status=1) if equipment_stream_queryset.exists(): try: key = uid_channel + '/' + basename + '.mp4' # 转换到 ff = FFmpeg( inputs={input_path: None}, outputs={output_path: ['-vcodec', 'copy', '-strict', '-2']} ) ff.run() # 截取第一帧当缩略图 # ffmpeg -i 555.flv -y -f image2 -ss 08.010 -t 0.001 -s 352x240 b.jpg except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) else: if os.path.exists(output_path): s3 = S3ClassObject() s3_response = s3.put_mp4_object(body=open(output_path, 'rb'), key=key) print(s3_response) if s3_response['ResponseMetadata']['HTTPStatusCode'] == 200: # 记录保存文件容量,由于此操作过于频繁入库,上线代码去掉 file_size = CommonService.get_file_size(file_path=output_path, suffix_type='MB', decimal_point=2) size = equipment_stream_queryset[0].total_flow if size: pass else: size = 0 equipment_stream_queryset.update(total_flow=size + file_size) # rm_common = 'rm ' + input_path + ' ' + output_path subprocess.Popen(rm_common, shell=True) return HttpResponse(status=200) else: rm_common = 'rm ' + input_path + ' ' + output_path print('respone error') subprocess.Popen(rm_common, shell=True) return HttpResponse(status=403) else: return ResponseJSON(444) @csrf_exempt def push_flv_s3(request, *callback_args, **callback_kwargs): if request.method == 'GET': request.encoding = 'utf-8' request_dict = request.GET if request.method == 'POST': request.encoding = 'utf-8' request_dict = request.POST # 文件名 basename = request_dict.get('basename', None) # print(basename) if basename != None: base_path = 'tmp/flv/' file_base_path = base_path + basename flv_path = file_base_path + '.flv' if os.path.exists(flv_path): dict_file = basename.split('-') uid_channel = dict_file[0] file_time = dict_file[1] uid = uid_channel.split('_')[0] channel = uid_channel.split('_')[1] equipment_stream_queryset = Equipment_Stream.objects.filter(channel=channel, uid=uid, status=1) if equipment_stream_queryset.exists(): try: jpg_path = file_base_path + '.jpg' # 截取第一帧当缩略图ffmpeg -i 2N1K3LE78TYJ38CE111A_99-1526882169.flv -y -f image2 -t 0.001 -s 352x240 a.jpg ff = FFmpeg( inputs={flv_path: None}, outputs={jpg_path: ['-y', '-f', 'image2', '-t', '0.001', '-s', '352x240']} ) ff.cmd ff.run() except Exception as e: errorInfo = traceback.format_exc() print(errorInfo) else: if os.path.exists(jpg_path): s3 = S3ClassObject() flv_key = uid_channel + '/flv/' + file_time + '.flv' jpg_key = uid_channel + '/jpg/' + file_time + '.jpg' try: flv_response = s3.put_object(body=open(flv_path, 'rb'), key=flv_key) jpg_response = s3.put_object(body=open(jpg_path, 'rb'), key=jpg_key) if flv_response['ResponseMetadata']['HTTPStatusCode'] == 200 and \ jpg_response['ResponseMetadata']['HTTPStatusCode'] == 200: code = 0 else: code = 444 except Exception as e: print(repr(e)) code = 444 pass path_list = [flv_path, jpg_path] for i in path_list: try: os.remove(i) except Exception as e: print(repr(e)) pass return ResponseJSON(code) return ResponseJSON(444)