#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
@AUTHOR: ASJRD018
@NAME: AnsjerFormal
@software: PyCharm
@DATE: 2018/12/5 9:30
@Version: python3.6
@MODIFY DECORD:ansjer dev
@file: CloudVod.py
@Contact: chanjunkai@163.com
"""
import json
import math
import time
import urllib
import oss2
import paypalrestsdk
from aliyunsdkcore import client
from aliyunsdksts.request.v20150401 import AssumeRoleRequest
from django.http import JsonResponse,HttpResponseRedirect,HttpResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.base import View
from Ansjer.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, OSS_ROLE_ARN, SERVER_DOMAIN
from Model.models import Device_Info, Order_Model, Store_Meal, VodHlsModel, OssCrdModel, UID_Bucket
from Object.ResponseObject import ResponseObject
from Object.TokenObject import TokenObject
from Object.UidTokenObject import UidTokenObject
from Service.CommonService import CommonService
'''
# 获取设备推送hls流 证书
http://192.168.136.40:8077/cloudVod/getSts?uidToken=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1aWQiOiJGVFNMTDhITTQzN1ozOFdVMTExQSIsImNoYW5uZWwiOiI0In0.HO-PzoRwhQ4CFNkjthqOitf48c-XOvHjtNGCeUmBe9g
# 获取存储的播放文件列表
#修改状态
http://192.168.136.40:8077/cloudVod/status?token=local&did=138001380001543918745881545&channel=4&status=1
# 回调vod
http://192.168.136.40:8077/cloudVod/storeplaylist?uidToken=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1aWQiOiJGVFNMTDhITTQzN1ozOFdVMTExQSIsImNoYW5uZWwiOiI0In0.HO-PzoRwhQ4CFNkjthqOitf48c-XOvHjtNGCeUmBe9g&time=1234567891
=============================
# 生成订单
http://test.dvema.com/cloudVod/createOrder?token=test&did=138001380001544514277661990&channel=4&rank=1
# 修改设备云存状态
http://test.dvema.com/cloudVod/status?did=138001380001544514277661990&channel=4&token=test&status=1
# 获取指定设备云存关联信息
http://test.dvema.com/cloudVod/details?token=test&did=138001380001540342559510534
# 获取回放列表
http://test.dvema.com/cloudVod/getHlsList?did=138001380001544514277661990&channel=4&token=test&daytime=2018121001
2设备端
http://test.dvema.com/cloudVod/getSts?uidToken=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjaGFubmVsIjoiNCIsInVpZCI6IkZUU0xMOEhNNDM3WjM4V1UxMTFBIn0.wkrwYvIYf5qEukOSTxALSAgSqop-gNBdEvSwScOgYB8
'''
# 设备信息添加
class CloudVodView(View):
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super(CloudVodView, self).dispatch(*args, **kwargs)
def get(self, request, *args, **kwargs):
request.encoding = 'utf-8'
operation = kwargs.get('operation')
return self.validation(request.GET, request, operation)
def post(self, request, *args, **kwargs):
request.encoding = 'utf-8'
operation = kwargs.get('operation')
return self.validation(request.POST, request, operation)
def validation(self, request_dict, request, operation):
response = ResponseObject()
if operation is None:
return response.json(444, 'error path')
if operation == 'getSts':
# 移动侦测获取设备sts证书
ip = CommonService.get_ip_address(request)
return self.do_getSts(request_dict, ip, response)
# 付款完成
elif operation == 'payExecute':
return self.do_paypal_execute(request_dict, response)
elif operation == 'storeplaylist':
return self.do_store_palylist(request_dict, response)
elif operation == 'payOK':
return self.do_pay_ok()
elif operation == 'payError':
return self.do_pay_error()
else:
token = request_dict.get('token', None)
# 设备主键uid
tko = TokenObject(token)
tko.valid()
response.lang = tko.lang
if tko.code != 0:
return response.json(tko.code)
userID = tko.userID
if operation == 'getHlsList':
return self.do_get_hls_list(request_dict, userID, response)
elif operation == 'createOrder':
return self.do_create_order(request_dict, userID, response)
elif operation == 'status':
return self.do_change_status(request_dict, userID, response)
elif operation == 'playlist':
return self.do_get_playlist(request_dict, userID, response)
else:
return response.json(414)
def do_pay_error(self):
response = HttpResponse()
response.content = '''
msg
付款失败
'''
return response
def do_pay_ok(self):
response = HttpResponse()
response.content = '''
msg
付款成功
'''
return response
# next
def do_create_order(self, request_dict, userID, response):
did = request_dict.get('did', None)
rank = request_dict.get('rank', None)
channel = request_dict.get('channel', None)
if not did or not channel or not rank:
return response.json(444, 'did,channel,rank')
smqs = Store_Meal.objects.filter(id=rank).values("currency", "price", "content", "day", "bucket__storeDay")
if not smqs.exists():
return response.json(10, '套餐不存在')
currency = smqs[0]['currency']
price = smqs[0]['price']
content = smqs[0]['content']
day = smqs[0]['day']
qs = Device_Info.objects.filter(userID_id=userID, id=did).values("UID")
if not qs.exists():
return response.json(13)
uid = qs[0]['UID']
ubqs = UID_Bucket.objects.filter(uid=uid, channel=channel).values("bucket__storeDay")
if ubqs.exists():
if ubqs[0]['bucket__storeDay'] > smqs[0]['bucket__storeDay']:
return response.json(10, '不可降级')
# 续费流程
# 新增流程
orderID = CommonService.createOrderID()
call_sub_url = "{SERVER_DOMAIN}cloudVod/payExecute?orderID={orderID}". \
format(SERVER_DOMAIN=SERVER_DOMAIN, orderID=orderID)
# call_sub_url = "http://192.168.136.40:8077/cloudVod/payExecute?orderID={orderID}".format(
# SERVER_DOMAIN=SERVER_DOMAIN, orderID=orderID)
call_clc_url = "http://192.168.136.40:8077/cloudVod/cancleorder"
paypalrestsdk.configure({
"mode": "sandbox", # sandbox or live
"client_id": "AfnfDqezODOoWGS-W2Itu-Zl1ay1R95IsGlMqPghPA3KGhkPndNMnQT0bdEewvSv92XAFIfLiinmyhBL",
"client_secret": "EErLskwYA1xXY3890mHx5OhzgK83B2rNc57zIozGNyKc8i6RJuhPTF9WyhhdZgyDEih0heo1MH9Jk1lj"
})
payment = paypalrestsdk.Payment({
"intent": "sale",
"payer": {"payment_method": "paypal"},
"redirect_urls": {"return_url": call_sub_url, "cancel_url": call_clc_url},
"transactions": [{
"item_list": {
"items": [{"name": "Cloud video", "sku": "1", "price": price, "currency": "USD", "quantity": 1}]},
"amount": {"total": price, "currency": currency},
"description": content
}]})
if payment.create():
print("Payment created successfully")
else:
print(payment.error)
return response.json(10, payment.error)
print(payment)
nowTime = CommonService.get_utc()
for link in payment.links:
if link.rel == "approval_url":
approval_url = str(link.href)
print("Redirect for approval: %s" % (approval_url))
Order_Model.objects.create(orderID=orderID, UID=uid, channel=channel, userID_id=userID, desc=content,
price=price, currency=currency, addTime=nowTime, updTime=nowTime,
endTime=nowTime + int(day) * 3600 * 24,
rank_id=rank,
paypal=approval_url)
return response.json(0, {"redirectUrl": approval_url})
return response.json(10, 'generate_order_false')
def do_getSts(self, request_dict, ip, response):
'''
生成sts上传授权
'''
# uidToken = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1aWQiOiJGVFNMTDhITTQzN1ozOFdVMTExQSJ9.GIBt8SgY-3yRt9mlihtvRwLM-MT8uVPDKCUQ2yvV3Vo'
uidToken = request_dict.get('uidToken', None)
utko = UidTokenObject(uidToken)
if utko.flag is False:
return response.json(444, 'uidToken')
UID = utko.UID
channel = utko.channel
print(channel)
print(UID)
ubqs = UID_Bucket.objects.filter(uid=UID, channel=channel, status=1).values('channel', 'bucket__bucket',
'bucket__endpoint',
'bucket__region', 'endTime')
if not ubqs.exists():
res = {'code': 403}
return JsonResponse(status=200, data=res)
# return response.json(10, '设备未购买')
now_time = CommonService.get_utc()
if now_time > ubqs[0]['endTime']:
res = {'code': 403}
return JsonResponse(status=200, data=res)
now_time_stamp = CommonService.get_utc()
oc_qs = OssCrdModel.objects.filter(uid=UID, channel=channel).values("addTime", "data")
if oc_qs.exists():
endTime = int(oc_qs[0]["addTime"]) + 3500
if endTime > now_time_stamp:
print(endTime)
print(now_time_stamp)
res = json.loads(oc_qs[0]["data"])
return JsonResponse(status=200, data=res)
# 套餐id
storage = '{uid}/vod{channel}/'.format(uid=UID, channel=channel)
bucket_name = ubqs[0]['bucket__bucket']
endpoint = ubqs[0]['bucket__endpoint']
access_key_id = OSS_STS_ACCESS_KEY
access_key_secret = OSS_STS_ACCESS_SECRET
region_id = ubqs[0]['bucket__region']
role_arn = OSS_ROLE_ARN
clt = client.AcsClient(access_key_id, access_key_secret, region_id)
req = AssumeRoleRequest.AssumeRoleRequest()
# 设置返回值格式为JSON。
req.set_accept_format('json')
req.set_RoleArn(role_arn)
req.set_RoleSessionName(UID)
req.set_DurationSeconds(3600)
Resource_access = "acs:oss:*:*:{bucket_name}/{uid_channel}*".format(bucket_name=bucket_name,
uid_channel=storage)
print(Resource_access)
policys = {
"Version": "1",
"Statement": [
{
"Action": ["oss:PutObject", "oss:DeleteObject", ],
"Resource": [Resource_access],
"Effect": "Allow",
"Condition": {
# "IpAddress": {"acs:SourceIp": ip}
# "IpAddress": {"acs:SourceIp": "120.237.157.184"}
# "IpAddress": {"acs:SourceIp": "*"}
}
}
]
}
req.set_Policy(Policy=json.dumps(policys))
body = clt.do_action(req)
# 使用RAM账号的AccessKeyId和AccessKeySecret向STS申请临时token。
token = json.loads(body.decode('utf-8'))
print(token)
res = {
'AccessKeyId': token['Credentials']['AccessKeyId'],
'AccessKeySecret': token['Credentials']['AccessKeySecret'],
'SecurityToken': token['Credentials']['SecurityToken'],
'Expiration': token['Credentials']['Expiration'],
'expire': '3600',
'endpoint': endpoint,
'bucket_name': bucket_name,
'arn': token['AssumedRoleUser']['Arn'],
'code': 0,
'storage': storage,
'ip': ip}
if oc_qs.exists():
oc_qs.update(data=json.dumps(res), addTime=now_time_stamp)
else:
OssCrdModel.objects.create(uid=UID, channel=channel, data=json.dumps(res), addTime=now_time_stamp)
return JsonResponse(status=200, data=res)
# 获取设备回放列表
def do_get_hls_list(self, request_dict, userID, response):
did = request_dict.get('did', None)
channel = request_dict.get('channel', None)
# 目录开始第一个
# marker = request_dict.get('marker', '')
daytime = request_dict.get('daytime', None)
hour = request_dict.get('hour', None)
if not did or not channel or not daytime:
return response.json(444, 'did,channel')
qs = Device_Info.objects.filter(userID_id=userID, id=did).values("UID")
if not qs.exists():
return response.json(13)
uid = qs[0]['UID']
ubqs = UID_Bucket.objects.filter(uid=uid, channel=channel) \
.values('endTime', 'bucket__bucket', 'bucket__endpoint')
if not ubqs.exists():
return response.json(10, '未购买')
now_time = time.time()
if now_time > ubqs[0]['endTime']:
return response.json(10, '已过期')
if not ubqs.exists():
return response.json(10, '未开通云存储')
bucket_name = ubqs[0]["bucket__bucket"]
endpoint = ubqs[0]["bucket__endpoint"]
auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
vod_play_list = []
prefix = '{uid}/vod{channel}/{daytime}/'.format(uid=uid, channel=channel, daytime=daytime, hour=hour)
for obj in oss2.ObjectIterator(bucket=bucket, prefix=prefix, delimiter='/', max_keys=2):
# 通过is_prefix方法判断obj是否为文件夹。
if obj.is_prefix(): # 文件夹
print('directory: ' + obj.key)
ptime = obj.key.split('/')[3]
url = bucket.sign_url('GET', '{prefix}{name}.m3u8'.format(prefix=obj.key, name=ptime), 3600,
params={'x-oss-process': 'hls/sign'})
urllst = url.split('?')
url_start = urllib.parse.unquote(urllst[0])
url_end = urllst[1]
vod_play_url = '{url_start}?{url_end}'.format(url_start=url_start, url_end=url_end)
thumb_key = '{prefix}ts0.ts'.format(prefix=obj.key)
print(thumb_key)
thumb = bucket.sign_url('GET', thumb_key, 3600,
params={'x-oss-process': 'video/snapshot,t_10000,m_fast,w_300'})
vod_play_list.append({'name': ptime, 'sign_url': vod_play_url, 'thumb': thumb})
else: # 文件
pass
# print('file: ' + obj.key)
return response.json(0, vod_play_list)
def do_paypal_execute(self, request_dict, response):
paymentId = request_dict.get('paymentId', None)
PayerID = request_dict.get('PayerID', None)
orderID = request_dict.get('orderID', None)
if not paymentId or not PayerID or not orderID:
red_url = "{SERVER_DOMAIN}cloudVod/payError".format(SERVER_DOMAIN=SERVER_DOMAIN)
return HttpResponseRedirect(red_url)
return response.json(444, 'paymentId,PayerID,orderID')
paypalrestsdk.configure({
"mode": "sandbox", # sandbox or live
"client_id": "AfnfDqezODOoWGS-W2Itu-Zl1ay1R95IsGlMqPghPA3KGhkPndNMnQT0bdEewvSv92XAFIfLiinmyhBL",
"client_secret": "EErLskwYA1xXY3890mHx5OhzgK83B2rNc57zIozGNyKc8i6RJuhPTF9WyhhdZgyDEih0heo1MH9Jk1lj"
})
# ID of the payment. This ID is provided when creating payment.
payment = paypalrestsdk.Payment.find(paymentId)
payres = payment.execute({"payer_id": PayerID})
print(payres)
if not payres:
red_url = "{SERVER_DOMAIN}cloudVod/payError".format(SERVER_DOMAIN=SERVER_DOMAIN)
return HttpResponseRedirect(red_url)
print(payment.error) # Error Hash
return response.json(10, payment.error)
print("Payment execute successfully")
order_qs = Order_Model.objects.filter(orderID=orderID)
order_qs.update(status=1, updTime=CommonService.get_utc())
order_list = order_qs.values("UID", "rank_id", "channel")
rank_id = order_list[0]['rank_id']
UID = order_list[0]['UID']
channel = order_list[0]['channel']
smqs = Store_Meal.objects.filter(id=rank_id).values("day", "bucket_id", "bucket__storeDay")
bucketId = smqs[0]['bucket_id']
if not smqs.exists():
return response.json(0, '套餐已删除')
addTime = int(smqs[0]["day"]) * 24 * 3600
ubqs = UID_Bucket.objects.filter(uid=UID, channel=channel).values("bucket_id", "endTime", "bucket__storeDay")
nowTime = CommonService.get_utc()
if ubqs.exists():
# 判断是否过期了
if nowTime > ubqs[0]['endTime']:
ubqs.update(endTime=nowTime + addTime)
else:
# 同一个bucket续费
if bucketId == ubqs[0]['bucket_id']:
ubqs.update(endTime=ubqs[0]['endTime'] + addTime)
else:
if ubqs[0]['bucket__storeDay'] > smqs[0]['bucket__storeDay']:
return response.json(10, '不可降级')
else:
origin_storeDay = int(ubqs[0]['bucket__storeDay'])
upgrade_storeDay = int(smqs[0]['bucket__storeDay'])
ctcTime = ubqs[0]['endTime'] - nowTime
multiple = math.ceil(upgrade_storeDay / origin_storeDay)
ubqs.update(endTime=ctcTime / multiple + addTime + ubqs[0]['endTime'])
# 新增模式
else:
print('craete')
UID_Bucket.objects.create(
uid=UID,
channel=channel,
bucket_id=bucketId,
endTime=nowTime + addTime
)
# return response.json(0)
red_url = "{SERVER_DOMAIN}cloudVod/payOK".format(SERVER_DOMAIN=SERVER_DOMAIN)
return HttpResponseRedirect(red_url)
def do_change_status(self, request_dict, userID, response):
did = request_dict.get('did', None)
status = request_dict.get('status', None)
channel = request_dict.get('channel', None)
if not did or not status or not channel:
return response.json(444, 'did,status,channel')
dvqs = Device_Info.objects.filter(id=did, userID_id=userID).values("UID")
if not dvqs.exists():
return response.json(10, '没有设备')
UID = dvqs[0]["UID"]
ubqs = UID_Bucket.objects.filter(channel=channel, uid=UID)
if not ubqs.exists():
return response.json(10, '未购买')
now_time = CommonService.get_utc()
if now_time > ubqs[0].endTime:
return response.json(10, '已过期')
ubqs.update(status=status)
utko = UidTokenObject()
utko.generate(data={'uid': UID, 'channel': channel})
uidTkUrl = "{SERVER_DOMAIN}cloudVod/getSts?uidToken={uidToken}". \
format(uidToken=utko.token, SERVER_DOMAIN=SERVER_DOMAIN)
storeHlsUrl = "{SERVER_DOMAIN}cloudVod/storeplaylist?uidToken={uidToken}". \
format(uidToken=utko.token, SERVER_DOMAIN=SERVER_DOMAIN)
return response.json(0, {'uidTkUrl': uidTkUrl, 'storeHlsUrl': storeHlsUrl})
def do_store_palylist(self, request_dict, response):
uidToken = request_dict.get('uidToken', None)
storeTime = request_dict.get('time', None)
sec = request_dict.get('sec', None)
utko = UidTokenObject(uidToken)
if utko.flag is False:
return response.json(444, 'uidToken')
if not uidToken or not storeTime or not sec:
return response.json(444, 'uidToken,time,sec')
UID = utko.UID
channel = utko.channel
print(channel)
print(UID)
qs = UID_Bucket.objects.filter(uid=UID, channel=channel, status=1).values("channel", "bucket__storeDay",
"bucket_id")
if not qs.exists():
return response.json(10, '设备未购买')
# nowTime = time.time()
storeDay = qs[0]['bucket__storeDay']
bucketID = qs[0]['bucket_id']
endTime = int(storeTime) + storeDay * 86400
VodHlsModel.objects.create(uid=UID, channel=channel, time=storeTime, endTime=endTime, bucket_id=bucketID,
sec=sec)
return response.json(0)
def do_get_playlist(self, request_dict, userID, response):
# stime = request_dict.get('stime', None)
# etime = request_dict.get('etime', None)
daytime = request_dict.get('daytime', None)
timeArray = time.strptime(daytime, "%Y%m%d%H")
startTime = time.mktime(timeArray)
endTime = startTime + 3600
did = request_dict.get('did', None)
channel = request_dict.get('channel', None)
dvqs = Device_Info.objects.filter(id=did, userID_id=userID).values("UID")
if not dvqs.exists():
return response.json(10, '无设备')
UID = dvqs[0]["UID"]
ubqs = UID_Bucket.objects.filter(uid=UID, channel=channel).values('status')
if not ubqs.exists():
return response.json(10, '设备未购买')
nowTime = time.time()
vodqs = VodHlsModel.objects.filter(uid=UID, channel=channel, time__range=(startTime, endTime),
endTime__gte=nowTime) \
.values("time", "sec", "bucket__bucket", "bucket__endpoint", "bucket__region")
vod_play_list = []
print(vodqs)
for vod in vodqs:
bucket_name = vod["bucket__bucket"]
endpoint = vod["bucket__endpoint"]
auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
m3u8 = '{uid}/vod{channel}/{daytime}/{time}/{time}.m3u8'. \
format(uid=UID, channel=channel, daytime=daytime, time=vod['time'])
ts = '{uid}/vod{channel}/{daytime}/{time}/ts0.ts'.format(uid=UID, channel=channel, daytime=daytime,
time=vod['time'])
url = bucket.sign_url('GET', m3u8, 3600, params={'x-oss-process': 'hls/sign'})
urllst = url.split('?')
url_start = urllib.parse.unquote(urllst[0])
url_end = urllst[1]
vod_play_url = '{url_start}?{url_end}'.format(url_start=url_start, url_end=url_end)
thumb = bucket.sign_url('GET', ts, 3600,
params={'x-oss-process': 'video/snapshot,t_10000,m_fast,w_300'})
vod_play_list.append({'name': vod['time'], 'sign_url': vod_play_url, 'thumb': thumb, 'sec': vod['sec']})
return response.json(0, vod_play_list)