Подключение senesor к aws iot - PullRequest
0 голосов
/ 03 марта 2020

Мы пытались подключить датчик к aws iot. Датчик подключен к aws, но мы не можем просмотреть сообщение в тени. Наша тень не обновляется. Пожалуйста, предложите несколько способов, чтобы мы могли обновить нашу тень.

1 Ответ

0 голосов
/ 03 марта 2020

Ниже приведен класс, который я создал для взаимодействия с AWS тенями IoT, в котором есть способы чтения, обновления и удаления теней. Главное здесь - я использовал клиент boto3 для подключения к AWS IoT, и это позволяет мне pu sh и тягу JSON для хранения в облаке (тень предмета).

import boto3
import logging
import json

# Initialize logger for CloudWatch logs
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class IoTOps:
    """
    Params: iotClient boto3 client used to update thing's shadow and also send 
    messages to the thing
        topic STRING - the MQTT Topic to send a message to
        QoS - INT - Essentially the channel of a specific topic to communicate
        over
    """
    def __init__(self, iotConfig):
        self.iotClient = boto3.client('iot-data', region_name='us-east-1')
        self.topic = iotConfig['topic']
        self.QoS = iotConfig['QoS']
        self.thingName = iotConfig['thingName']

    def publish_mqtt_message(self, payload):
        """                                                                        
        Description: Send MQTT message to MQTT Topic                         

        Return: None                                                          
        """                                                                       
        # Publish a MQTT message to a topic for our thing to ingest               
        logger.info('publishing MQTT message to topic: {}'.format(self.topic))         
        self.iotClient.publish(                                                           
            topic=self.topic,                     
            qos=self.QoS,                                                               
            payload=json.dumps(payload)                                   
        )  
        return

    def update_shadow(self, payload):
        """
        Summary: Updates a things shadow for a specified thing
        Return The state information in a JSON format
        """
        updatedPayload = {'state': {'desired': payload}}
        response = self.iotClient.update_thing_shadow(
            thingName=self.thingName,
            payload=json.dumps(updatedPayload)
        )
        return response

    def get_shadow(self):
        """
        Summary: gets thing shadow for a specified thing
        Return: The state information in JSON format
        """
        response = self.iotClient.get_thing_shadow(
            thingName=self.thingName
        )
        response = json.loads(response['payload'].read().decode('utf-8'))['state']['desired']
        return response

    def delete_shadow(self):
        """
        Summary: deletes thing shadow for a specified thing
        Return: The state information in a JSON format
        """
        response = self.iotClient.delete_thing_shadow(
            thingName=self.thingName
        )
        return response
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...