Ниже приведен класс, который я создал для взаимодействия с AWS тенями IoT, в котором есть способы чтения, обновления и удаления теней. Главное здесь - я использовал клиент boto3 для подключения к AWS IoT, и это позволяет мне pu sh и тягу JSON для хранения в облаке (тень предмета).
import boto3
import logging
import json
# Initialize logger for CloudWatch logs
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class IoTOps:
"""
Params: iotClient boto3 client used to update thing's shadow and also send
messages to the thing
topic STRING - the MQTT Topic to send a message to
QoS - INT - Essentially the channel of a specific topic to communicate
over
"""
def __init__(self, iotConfig):
self.iotClient = boto3.client('iot-data', region_name='us-east-1')
self.topic = iotConfig['topic']
self.QoS = iotConfig['QoS']
self.thingName = iotConfig['thingName']
def publish_mqtt_message(self, payload):
"""
Description: Send MQTT message to MQTT Topic
Return: None
"""
# Publish a MQTT message to a topic for our thing to ingest
logger.info('publishing MQTT message to topic: {}'.format(self.topic))
self.iotClient.publish(
topic=self.topic,
qos=self.QoS,
payload=json.dumps(payload)
)
return
def update_shadow(self, payload):
"""
Summary: Updates a things shadow for a specified thing
Return The state information in a JSON format
"""
updatedPayload = {'state': {'desired': payload}}
response = self.iotClient.update_thing_shadow(
thingName=self.thingName,
payload=json.dumps(updatedPayload)
)
return response
def get_shadow(self):
"""
Summary: gets thing shadow for a specified thing
Return: The state information in JSON format
"""
response = self.iotClient.get_thing_shadow(
thingName=self.thingName
)
response = json.loads(response['payload'].read().decode('utf-8'))['state']['desired']
return response
def delete_shadow(self):
"""
Summary: deletes thing shadow for a specified thing
Return: The state information in a JSON format
"""
response = self.iotClient.delete_thing_shadow(
thingName=self.thingName
)
return response