# -*- encoding: utf-8 -*- """ @File : AWSIoTDataPlaneUtil.py @Time : 2023/4/13 16:43 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm @Document: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iot-data.html#iotdataplane """ import json import logging import boto3 LOGGER = logging.getLogger('info') class AWSIoTDataPlaneService: def __init__(self, aws_access_key_id, secret_access_key, region_name): self.client = boto3.client( 'iot-data', aws_access_key_id=aws_access_key_id, aws_secret_access_key=secret_access_key, region_name=region_name ) def update_thing_shadow(self, thing_name, data, shadow_name=None): """ 更新指定事物的影子 @param thing_name: 物品名称 @param data: 更新数据 @param shadow_name: 自定义影子名称(使用经典影子可不填) @return: 更新状态 """ try: params = { 'thingName': thing_name, 'payload': json.dumps(data) } if shadow_name: params['shadowName'] = shadow_name response = self.client.update_thing_shadow(**params) assert response['ResponseMetadata']['HTTPStatusCode'] == 200 return True except Exception as e: LOGGER.info('更新设备影子异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False