Разбор ввода массива JSON в IoT Analytics - PullRequest
0 голосов
/ 21 февраля 2019

Я получаю одновременно несколько записей данных в виде массива JSON с устройства IoT в моем канале.Полученное сообщение выглядит следующим образом:

[
    {
      "Field1": "Value1",
      "Field2": "Value2",
      "Field3": "Value3"
    },
    {
      "Field1": "AnotherValue1",
      "Field2": "AnotherValue2",
      "Field3": "AnotherValue3"
    }
]

Я создаю набор данных, используя следующий запрос SQL:

SELECT * FROM mydatastore

Когда я запускаю набор данных, возвращается результат:

array                                              __dt 
-----                                              -----
[{field1=Value1, field2=Value2, field3=Value3}]    2019-02-21 00:00:00.000

Мой желаемый результат:

Field1           Field2           Field3
------           ------           ------
Value1           Value2           Value3
AnotherValue1    AnotherValue2    AnotherValue3

Как получить IoT Analytics для создания новой строки в хранилище данных для каждого элемента в полученном массиве JSON?

1 Ответ

0 голосов
/ 26 апреля 2019

Как я могу заставить IoT Analytics создать новую строку в хранилище данных для каждого элемента в полученном массиве JSON?

Самый простой способ - использовать лямбдуУпражнение в вашем конвейере, и пусть оно анализирует одну полезную нагрузку JSON в нужную структуру.Это в некоторой степени зависит от «сырой» структуры сообщений, отправляемых на канал.

Так, например, мы можем отправлять данные на канал через CLI batch-put-message , напримертак:

aws iotanalytics batch-put-message --channel-name sample_channel --messages '[{"messageId": "message1", "payload": "{\"array\": [{\"Field1\": \"Value1\", \"Field2\": \"Value2\", \"Field3\": \"Value3\"},{\"Field1\": \"AnotherValue1\", \"Field2\": \"AnotherValue2\", \"Field3\": \"AnotherValue3\"}]}"}]'

Канал будет тогда иметь одно сообщение, структурированное так:

{
  "messageId": "message1",
  "payload": {
    "array": [
      {
        "Field1": "Value1",
        "Field2": "Value2",
        "Field3": "Value3"
      },
      {
        "Field1": "AnotherValue1",
        "Field2": "AnotherValue2",
        "Field3": "AnotherValue3"
      }
    ]
  }
}

Если ваш конвейер имеет лямбда-активность, то сообщения из канала будутбыть переданы вашей функции Lambda в аргументе event.

Я создал простую функцию Lambda (используя Python 3.7) с помощью встроенного редактора консоли AWS Lambda и назвал ее sample_lambda:

import json
import sys
import logging

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)


def lambda_handler(event, context):
    # This can be handy to see the raw structure of the incoming event
    # will log to the matching CloudWatch log:
    # /aws/lambda/<name_of_the_lambda>
    # logger.info("raw event: {}".format(event))

    parsed_rows = []

    # Depending on the batchSize setting of the Lambda Pipeline Activity,
    # you may receive multiple messages in a single event
    for message_payload in event:
        if 'array' in message_payload:
            for row in message_payload['array']:
                parsed = {}
                for key, value in row.items():
                    parsed[key] = value
                parsed_rows.append(parsed)

    return parsed_rows

Я добавил соответствующие разрешения, чтобы IoT-Analytics мог вызывать лямбда-функцию через CLI:

aws lambda add-permission --function-name sample_lambda --statement-id statm01 --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction

При повторной обработке конвейера проанализированные строки помещаются в хранилище данных;выполняя DataSet, я получаю чистый результат:

"array","field1","field2","field3","__dt"
,"Value1","Value2","Value3","2019-04-26 00:00:00.000"
,"AnotherValue1","AnotherValue2","AnotherValue3","2019-04-26 00:00:00.000"
...