#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: AnsjerFormal @software: PyCharm @DATE: 2018/12/5 9:30 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: CloudVod.py @Contact: chanjunkai@163.com """ import json import math import time import urllib import oss2 import paypalrestsdk from aliyunsdkcore import client from aliyunsdksts.request.v20150401 import AssumeRoleRequest from django.http import JsonResponse,HttpResponseRedirect,HttpResponse from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from django.views.generic.base import View from Ansjer.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, OSS_ROLE_ARN, SERVER_DOMAIN from Model.models import Device_Info, Order_Model, Store_Meal, VodHlsModel, OssCrdModel, UID_Bucket from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Object.UidTokenObject import UidTokenObject from Service.CommonService import CommonService ''' # 获取设备推送hls流 证书 http://192.168.136.40:8077/cloudVod/getSts?uidToken=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1aWQiOiJGVFNMTDhITTQzN1ozOFdVMTExQSIsImNoYW5uZWwiOiI0In0.HO-PzoRwhQ4CFNkjthqOitf48c-XOvHjtNGCeUmBe9g # 获取存储的播放文件列表 #修改状态 http://192.168.136.40:8077/cloudVod/status?token=local&did=138001380001543918745881545&channel=4&status=1 # 回调vod http://192.168.136.40:8077/cloudVod/storeplaylist?uidToken=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1aWQiOiJGVFNMTDhITTQzN1ozOFdVMTExQSIsImNoYW5uZWwiOiI0In0.HO-PzoRwhQ4CFNkjthqOitf48c-XOvHjtNGCeUmBe9g&time=1234567891 ============================= # 生成订单 http://test.dvema.com/cloudVod/createOrder?token=test&did=138001380001544514277661990&channel=4&rank=1 # 修改设备云存状态 http://test.dvema.com/cloudVod/status?did=138001380001544514277661990&channel=4&token=test&status=1 # 获取指定设备云存关联信息 http://test.dvema.com/cloudVod/details?token=test&did=138001380001540342559510534 # 获取回放列表 http://test.dvema.com/cloudVod/getHlsList?did=138001380001544514277661990&channel=4&token=test&daytime=2018121001 2设备端 http://test.dvema.com/cloudVod/getSts?uidToken=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjaGFubmVsIjoiNCIsInVpZCI6IkZUU0xMOEhNNDM3WjM4V1UxMTFBIn0.wkrwYvIYf5qEukOSTxALSAgSqop-gNBdEvSwScOgYB8 ''' # 设备信息添加 class CloudVodView(View): @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(CloudVodView, self).dispatch(*args, **kwargs) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation is None: return response.json(444, 'error path') if operation == 'getSts': # 移动侦测获取设备sts证书 ip = CommonService.get_ip_address(request) return self.do_getSts(request_dict, ip, response) # 付款完成 elif operation == 'payExecute': return self.do_paypal_execute(request_dict, response) elif operation == 'storeplaylist': return self.do_store_palylist(request_dict, response) elif operation == 'payOK': return self.do_pay_ok() elif operation == 'payError': return self.do_pay_error() else: token = request_dict.get('token', None) # 设备主键uid tko = TokenObject(token) tko.valid() response.lang = tko.lang if tko.code != 0: return response.json(tko.code) userID = tko.userID if operation == 'getHlsList': return self.do_get_hls_list(request_dict, userID, response) elif operation == 'createOrder': return self.do_create_order(request_dict, userID, response) elif operation == 'status': return self.do_change_status(request_dict, userID, response) elif operation == 'playlist': return self.do_get_playlist(request_dict, userID, response) else: return response.json(414) def do_pay_error(self): response = HttpResponse() response.status_code = 200 # response.content = ''' # # #
# #嵌入页区域
以拦截跳转链接的方式实现
''' return response def do_pay_ok(self): response = HttpResponse() # response.status_code = 200 # response.content = ''' # # # # #嵌入页区域
以拦截跳转链接的方式实现
''' return response # next def do_create_order(self, request_dict, userID, response): did = request_dict.get('did', None) rank = request_dict.get('rank', None) channel = request_dict.get('channel', None) if not did or not channel or not rank: return response.json(444, 'did,channel,rank') smqs = Store_Meal.objects.filter(id=rank).values("currency", "price", "content", "day", "bucket__storeDay") if not smqs.exists(): return response.json(10, '套餐不存在') currency = smqs[0]['currency'] price = smqs[0]['price'] content = smqs[0]['content'] day = smqs[0]['day'] qs = Device_Info.objects.filter(userID_id=userID, id=did).values("UID") if not qs.exists(): return response.json(13) uid = qs[0]['UID'] ubqs = UID_Bucket.objects.filter(uid=uid, channel=channel).values("bucket__storeDay") if ubqs.exists(): if ubqs[0]['bucket__storeDay'] > smqs[0]['bucket__storeDay']: return response.json(10, '不可降级') # 续费流程 # 新增流程 orderID = CommonService.createOrderID() call_sub_url = "{SERVER_DOMAIN}cloudVod/payExecute?orderID={orderID}". \ format(SERVER_DOMAIN=SERVER_DOMAIN, orderID=orderID) # call_sub_url = "http://192.168.136.40:8077/cloudVod/payExecute?orderID={orderID}".format( # SERVER_DOMAIN=SERVER_DOMAIN, orderID=orderID) call_clc_url = "http://192.168.136.40:8077/cloudVod/cancleorder" paypalrestsdk.configure({ "mode": "sandbox", # sandbox or live "client_id": "AfnfDqezODOoWGS-W2Itu-Zl1ay1R95IsGlMqPghPA3KGhkPndNMnQT0bdEewvSv92XAFIfLiinmyhBL", "client_secret": "EErLskwYA1xXY3890mHx5OhzgK83B2rNc57zIozGNyKc8i6RJuhPTF9WyhhdZgyDEih0heo1MH9Jk1lj" }) payment = paypalrestsdk.Payment({ "intent": "sale", "payer": {"payment_method": "paypal"}, "redirect_urls": {"return_url": call_sub_url, "cancel_url": call_clc_url}, "transactions": [{ "item_list": { "items": [{"name": "Cloud video", "sku": "1", "price": price, "currency": "USD", "quantity": 1}]}, "amount": {"total": price, "currency": currency}, "description": content }]}) if payment.create(): print("Payment created successfully") else: print(payment.error) return response.json(10, payment.error) print(payment) nowTime = int(time.time()) for link in payment.links: if link.rel == "approval_url": approval_url = str(link.href) print("Redirect for approval: %s" % (approval_url)) Order_Model.objects.create(orderID=orderID, UID=uid, channel=channel, userID_id=userID, desc=content, price=price, currency=currency, addTime=nowTime, updTime=nowTime, endTime=nowTime + int(day) * 3600 * 24, rank_id=rank, paypal=approval_url) return response.json(0, {"redirectUrl": approval_url}) return response.json(10, 'generate_order_false') def do_getSts(self, request_dict, ip, response): ''' 生成sts上传授权 ''' # uidToken = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1aWQiOiJGVFNMTDhITTQzN1ozOFdVMTExQSJ9.GIBt8SgY-3yRt9mlihtvRwLM-MT8uVPDKCUQ2yvV3Vo' uidToken = request_dict.get('uidToken', None) utko = UidTokenObject(uidToken) if utko.flag is False: return response.json(444, 'uidToken') UID = utko.UID channel = utko.channel print(channel) print(UID) ubqs = UID_Bucket.objects.filter(uid=UID, channel=channel, status=1).values('channel', 'bucket__bucket', 'bucket__endpoint', 'bucket__region', 'endTime') if not ubqs.exists(): res = {'code': 403} return JsonResponse(status=200, data=res) # return response.json(10, '设备未购买') now_time = int(time.time()) if now_time > ubqs[0]['endTime']: res = {'code': 403} return JsonResponse(status=200, data=res) now_time_stamp = int(time.time()) oc_qs = OssCrdModel.objects.filter(uid=UID, channel=channel).values("addTime", "data") if oc_qs.exists(): endTime = int(oc_qs[0]["addTime"]) + 3500 if endTime > now_time_stamp: print(endTime) print(now_time_stamp) res = json.loads(oc_qs[0]["data"]) return JsonResponse(status=200, data=res) # 套餐id storage = '{uid}/vod{channel}/'.format(uid=UID, channel=channel) bucket_name = ubqs[0]['bucket__bucket'] endpoint = ubqs[0]['bucket__endpoint'] access_key_id = OSS_STS_ACCESS_KEY access_key_secret = OSS_STS_ACCESS_SECRET region_id = ubqs[0]['bucket__region'] role_arn = OSS_ROLE_ARN clt = client.AcsClient(access_key_id, access_key_secret, region_id) req = AssumeRoleRequest.AssumeRoleRequest() # 设置返回值格式为JSON。 req.set_accept_format('json') req.set_RoleArn(role_arn) req.set_RoleSessionName(UID) req.set_DurationSeconds(3600) Resource_access = "acs:oss:*:*:{bucket_name}/{uid_channel}*".format(bucket_name=bucket_name, uid_channel=storage) print(Resource_access) policys = { "Version": "1", "Statement": [ { "Action": ["oss:PutObject", "oss:DeleteObject", ], "Resource": [Resource_access], "Effect": "Allow", "Condition": { # "IpAddress": {"acs:SourceIp": ip} # "IpAddress": {"acs:SourceIp": "120.237.157.184"} # "IpAddress": {"acs:SourceIp": "*"} } } ] } req.set_Policy(Policy=json.dumps(policys)) body = clt.do_action(req) # 使用RAM账号的AccessKeyId和AccessKeySecret向STS申请临时token。 token = json.loads(body.decode('utf-8')) print(token) res = { 'AccessKeyId': token['Credentials']['AccessKeyId'], 'AccessKeySecret': token['Credentials']['AccessKeySecret'], 'SecurityToken': token['Credentials']['SecurityToken'], 'Expiration': token['Credentials']['Expiration'], 'expire': '3600', 'endpoint': endpoint, 'bucket_name': bucket_name, 'arn': token['AssumedRoleUser']['Arn'], 'code': 0, 'storage': storage, 'ip': ip} if oc_qs.exists(): oc_qs.update(data=json.dumps(res), addTime=now_time_stamp) else: OssCrdModel.objects.create(uid=UID, channel=channel, data=json.dumps(res), addTime=now_time_stamp) return JsonResponse(status=200, data=res) # 获取设备回放列表 def do_get_hls_list(self, request_dict, userID, response): did = request_dict.get('did', None) channel = request_dict.get('channel', None) # 目录开始第一个 # marker = request_dict.get('marker', '') daytime = request_dict.get('daytime', None) hour = request_dict.get('hour', None) if not did or not channel or not daytime: return response.json(444, 'did,channel') qs = Device_Info.objects.filter(userID_id=userID, id=did).values("UID") if not qs.exists(): return response.json(13) uid = qs[0]['UID'] ubqs = UID_Bucket.objects.filter(uid=uid, channel=channel) \ .values('endTime', 'bucket__bucket', 'bucket__endpoint') if not ubqs.exists(): return response.json(10, '未购买') now_time = time.time() if now_time > ubqs[0]['endTime']: return response.json(10, '已过期') if not ubqs.exists(): return response.json(10, '未开通云存储') bucket_name = ubqs[0]["bucket__bucket"] endpoint = ubqs[0]["bucket__endpoint"] auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, endpoint, bucket_name) vod_play_list = [] prefix = '{uid}/vod{channel}/{daytime}/'.format(uid=uid, channel=channel, daytime=daytime, hour=hour) for obj in oss2.ObjectIterator(bucket=bucket, prefix=prefix, delimiter='/', max_keys=2): # 通过is_prefix方法判断obj是否为文件夹。 if obj.is_prefix(): # 文件夹 print('directory: ' + obj.key) ptime = obj.key.split('/')[3] url = bucket.sign_url('GET', '{prefix}{name}.m3u8'.format(prefix=obj.key, name=ptime), 3600, params={'x-oss-process': 'hls/sign'}) urllst = url.split('?') url_start = urllib.parse.unquote(urllst[0]) url_end = urllst[1] vod_play_url = '{url_start}?{url_end}'.format(url_start=url_start, url_end=url_end) thumb_key = '{prefix}ts0.ts'.format(prefix=obj.key) print(thumb_key) thumb = bucket.sign_url('GET', thumb_key, 3600, params={'x-oss-process': 'video/snapshot,t_10000,m_fast,w_300'}) vod_play_list.append({'name': ptime, 'sign_url': vod_play_url, 'thumb': thumb}) else: # 文件 pass # print('file: ' + obj.key) return response.json(0, vod_play_list) def do_paypal_execute(self, request_dict, response): paymentId = request_dict.get('paymentId', None) PayerID = request_dict.get('PayerID', None) orderID = request_dict.get('orderID', None) if not paymentId or not PayerID or not orderID: red_url = "{SERVER_DOMAIN}cloudVod/payError".format(SERVER_DOMAIN=SERVER_DOMAIN) return HttpResponseRedirect(red_url) return response.json(444, 'paymentId,PayerID,orderID') paypalrestsdk.configure({ "mode": "sandbox", # sandbox or live "client_id": "AfnfDqezODOoWGS-W2Itu-Zl1ay1R95IsGlMqPghPA3KGhkPndNMnQT0bdEewvSv92XAFIfLiinmyhBL", "client_secret": "EErLskwYA1xXY3890mHx5OhzgK83B2rNc57zIozGNyKc8i6RJuhPTF9WyhhdZgyDEih0heo1MH9Jk1lj" }) # ID of the payment. This ID is provided when creating payment. payment = paypalrestsdk.Payment.find(paymentId) payres = payment.execute({"payer_id": PayerID}) print(payres) if not payres: red_url = "{SERVER_DOMAIN}cloudVod/payError".format(SERVER_DOMAIN=SERVER_DOMAIN) return HttpResponseRedirect(red_url) print(payment.error) # Error Hash return response.json(10, payment.error) print("Payment execute successfully") order_qs = Order_Model.objects.filter(orderID=orderID) order_qs.update(status=1, updTime=int(time.time())) order_list = order_qs.values("UID", "rank_id", "channel") rank_id = order_list[0]['rank_id'] UID = order_list[0]['UID'] channel = order_list[0]['channel'] smqs = Store_Meal.objects.filter(id=rank_id).values("day", "bucket_id", "bucket__storeDay") bucketId = smqs[0]['bucket_id'] if not smqs.exists(): return response.json(0, '套餐已删除') addTime = int(smqs[0]["day"]) * 24 * 3600 ubqs = UID_Bucket.objects.filter(uid=UID, channel=channel).values("bucket_id", "endTime", "bucket__storeDay") nowTime = int(time.time()) if ubqs.exists(): # 判断是否过期了 if nowTime > ubqs[0]['endTime']: ubqs.update(endTime=nowTime + addTime) else: # 同一个bucket续费 if bucketId == ubqs[0]['bucket_id']: ubqs.update(endTime=ubqs[0]['endTime'] + addTime) else: if ubqs[0]['bucket__storeDay'] > smqs[0]['bucket__storeDay']: return response.json(10, '不可降级') else: origin_storeDay = int(ubqs[0]['bucket__storeDay']) upgrade_storeDay = int(smqs[0]['bucket__storeDay']) ctcTime = ubqs[0]['endTime'] - nowTime multiple = math.ceil(upgrade_storeDay / origin_storeDay) ubqs.update(endTime=ctcTime / multiple + addTime + ubqs[0]['endTime']) # 新增模式 else: print('craete') UID_Bucket.objects.create( uid=UID, channel=channel, bucket_id=bucketId, endTime=nowTime + addTime ) # return response.json(0) red_url = "{SERVER_DOMAIN}cloudVod/payOK".format(SERVER_DOMAIN=SERVER_DOMAIN) return HttpResponseRedirect(red_url) def do_change_status(self, request_dict, userID, response): did = request_dict.get('did', None) status = request_dict.get('status', None) channel = request_dict.get('channel', None) if not did or not status or not channel: return response.json(444, 'did,status,channel') dvqs = Device_Info.objects.filter(id=did, userID_id=userID).values("UID") if not dvqs.exists(): return response.json(10, '没有设备') UID = dvqs[0]["UID"] ubqs = UID_Bucket.objects.filter(channel=channel, uid=UID) if not ubqs.exists(): return response.json(10, '未购买') now_time = int(time.time()) if now_time > ubqs[0].endTime: return response.json(10, '已过期') ubqs.update(status=status) utko = UidTokenObject() utko.generate(data={'uid': UID, 'channel': channel}) uidTkUrl = "{SERVER_DOMAIN}cloudVod/getSts?uidToken={uidToken}". \ format(uidToken=utko.token, SERVER_DOMAIN=SERVER_DOMAIN) storeHlsUrl = "{SERVER_DOMAIN}cloudVod/storeplaylist?uidToken={uidToken}". \ format(uidToken=utko.token, SERVER_DOMAIN=SERVER_DOMAIN) return response.json(0, {'uidTkUrl': uidTkUrl, 'storeHlsUrl': storeHlsUrl}) def do_store_palylist(self, request_dict, response): uidToken = request_dict.get('uidToken', None) storeTime = request_dict.get('time', None) sec = request_dict.get('sec', None) utko = UidTokenObject(uidToken) if utko.flag is False: return response.json(444, 'uidToken') if not uidToken or not storeTime or not sec: return response.json(444, 'uidToken,time,sec') UID = utko.UID channel = utko.channel print(channel) print(UID) qs = UID_Bucket.objects.filter(uid=UID, channel=channel, status=1).values("channel", "bucket__storeDay", "bucket_id") if not qs.exists(): return response.json(10, '设备未购买') # nowTime = time.time() storeDay = qs[0]['bucket__storeDay'] bucketID = qs[0]['bucket_id'] endTime = int(storeTime) + storeDay * 86400 VodHlsModel.objects.create(uid=UID, channel=channel, time=storeTime, endTime=endTime, bucket_id=bucketID, sec=sec) return response.json(0) def do_get_playlist(self, request_dict, userID, response): # stime = request_dict.get('stime', None) # etime = request_dict.get('etime', None) daytime = request_dict.get('daytime', None) timeArray = time.strptime(daytime, "%Y%m%d%H") startTime = time.mktime(timeArray) endTime = startTime + 3600 did = request_dict.get('did', None) channel = request_dict.get('channel', None) dvqs = Device_Info.objects.filter(id=did, userID_id=userID).values("UID") if not dvqs.exists(): return response.json(10, '无设备') UID = dvqs[0]["UID"] ubqs = UID_Bucket.objects.filter(uid=UID, channel=channel).values('status') if not ubqs.exists(): return response.json(10, '设备未购买') nowTime = time.time() vodqs = VodHlsModel.objects.filter(uid=UID, channel=channel, time__range=(startTime, endTime), endTime__gte=nowTime) \ .values("time", "sec", "bucket__bucket", "bucket__endpoint", "bucket__region") vod_play_list = [] print(vodqs) for vod in vodqs: bucket_name = vod["bucket__bucket"] endpoint = vod["bucket__endpoint"] auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET) bucket = oss2.Bucket(auth, endpoint, bucket_name) m3u8 = '{uid}/vod{channel}/{daytime}/{time}/{time}.m3u8'. \ format(uid=UID, channel=channel, daytime=daytime, time=vod['time']) ts = '{uid}/vod{channel}/{daytime}/{time}/ts0.ts'.format(uid=UID, channel=channel, daytime=daytime, time=vod['time']) url = bucket.sign_url('GET', m3u8, 3600, params={'x-oss-process': 'hls/sign'}) urllst = url.split('?') url_start = urllib.parse.unquote(urllst[0]) url_end = urllst[1] vod_play_url = '{url_start}?{url_end}'.format(url_start=url_start, url_end=url_end) thumb = bucket.sign_url('GET', ts, 3600, params={'x-oss-process': 'video/snapshot,t_10000,m_fast,w_300'}) vod_play_list.append({'name': vod['time'], 'sign_url': vod_play_url, 'thumb': thumb, 'sec': vod['sec']}) return response.json(0, vod_play_list)