123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- # -*- encoding: utf-8 -*-
- """
- @File : AWSIoTDataPlaneUtil.py
- @Time : 2023/4/13 16:43
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- @Document: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iot-data.html#iotdataplane
- """
- import json
- import logging
- import boto3
- LOGGER = logging.getLogger('info')
- class AWSIoTDataPlaneService:
- def __init__(self, aws_access_key_id, secret_access_key, region_name):
- self.client = boto3.client(
- 'iot-data',
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=secret_access_key,
- region_name=region_name
- )
- def update_thing_shadow(self, thing_name, data, shadow_name=None):
- """
- 更新指定事物的影子
- @param thing_name: 物品名称
- @param data: 更新数据
- @param shadow_name: 自定义影子名称(使用经典影子可不填)
- @return: 更新状态
- """
- try:
- params = {
- 'thingName': thing_name,
- 'payload': json.dumps(data)
- }
- if shadow_name:
- params['shadowName'] = shadow_name
- response = self.client.update_thing_shadow(**params)
- assert response['ResponseMetadata']['HTTPStatusCode'] == 200
- return True
- except Exception as e:
- LOGGER.info('更新设备影子异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return False
- def get_thing_shadow(self, thing_name, shadow_name=None):
- """
- 获取指定事物的影子
- @param thing_name: 物品名称
- @param shadow_name: 自定义影子名称(使用经典影子可不填)
- @return: 更新状态
- """
- try:
- params = {
- 'thingName': thing_name,
- }
- if shadow_name:
- params['shadowName'] = shadow_name
- response = self.client.get_thing_shadow(**params)
- return json.loads(response['payload'].read())
- except Exception as e:
- LOGGER.info('获取设备影子异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return False
|