123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- # -*- coding: utf-8 -*-
- """
- @Author : Rocky
- @Time : 2022/10/18 17:13
- @File :AmazonKinesisVideoUtil.py
- """
- import boto3
- class AmazonKinesisVideoObject:
- """
- Amazon Kinesis Video Streams对象
- api文档链接: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisvideo.html
- """
- def __init__(self, aws_access_key_id, secret_access_key, region_name):
- self.access_id = aws_access_key_id
- self.access_secret = secret_access_key
- self.region_name = region_name
- self.client_conn = boto3.client(
- 'kinesisvideo',
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=secret_access_key,
- region_name=region_name
- )
- def create_stream(self, stream_name):
- """
- 创建视频流
- @param stream_name: 视频流名称
- @return : stream_arn or False
- """
- tags = {'project_kvs': 'kvs'}
- try:
- stream_arn = self.client_conn.create_stream(StreamName=stream_name, Tags=tags)['StreamARN']
- return stream_arn
- except Exception as e:
- print(e)
- return False
- def update_data_retention(self, stream_name, operation, data_retention_change_in_hours):
- """
- 修改视频流数据保留时间
- @param stream_name: 视频流名称
- @param operation: 增加/减少, 'INCREASE_DATA_RETENTION'|'DECREASE_DATA_RETENTION'
- @param data_retention_change_in_hours: 修改的时间
- @return : True or False
- """
- try:
- version = self.describe_stream(stream_name)['Version']
- self.client_conn.update_data_retention(StreamName=stream_name, Operation=operation, CurrentVersion=version,
- DataRetentionChangeInHours=data_retention_change_in_hours)
- return True
- except Exception as e:
- print(e)
- return False
- def describe_stream(self, stream_name):
- """
- 获取视频流信息数据
- @param stream_name: 视频流名称
- @return stream_info: 视频流信息数据
- """
- return self.client_conn.describe_stream(StreamName=stream_name)['StreamInfo']
- def get_data_endpoint(self, stream_name, api_name):
- """
- 获取指定流的终端节点以读取或写入
- @param api_name: API名称
- @param stream_name: 视频流名称
- @return stream_info: 视频流信息数据
- """
- return self.client_conn.get_data_endpoint(StreamName=stream_name, APIName=api_name)['DataEndpoint']
|