NiFi Flow для обогащения записей - PullRequest
0 голосов
/ 04 августа 2020

Я использую NiFi 1.11.4 для создания конвейера данных, по которому устройство IoT отправляет данные в формате JSON. Каждый раз, когда я получаю данные с устройства IoT, я получаю два JSON;

JSON_INITIAL

{
   devId: "abc",
   devValue: "TWOINITIALCHARS23",
}

и JSON_FINAL

{
   devId: "abc",
   devValue: "TWOINITIALCHARS45",
}

Разница во времени составляет несколько миллиметров. секунды, с которыми я получаю эти два файла потока. В моем случае мне нужно объединить этот JSON таким образом, чтобы мой результирующий JSON выглядел, как показано ниже (обратите внимание на удаление TWOINITIALCHARS в обоих случаях;

JSON_RESULT_AFTER_MERGE

{
   devId: "abc",
   devValue: "2345",
}

Это то, с чем должен иметь дело NiFi? Если да, я действительно был бы признателен за подход к разработке соответствующего потока для этого варианта использования.

1 Ответ

0 голосов
/ 05 августа 2020

Предполагается, что devId является stati c для устройства и не используется для корреляции (т.е. abc для всех сообщений, поступающих с этого устройства, а не abc для первых двух, а затем def для следующие два, et c.), у вас есть несколько вариантов:

  1. Используйте MergeContent для объединения содержимого потокового файла (два блока JSON) и ReplaceText для изменения объединенного содержимое, чтобы соответствовать желаемому результату. Для этого потребуется настроить свойства биннинга M C, чтобы ограничить окно слияния 1-2 секундами (например, сложно / недостаточно, если вы получаете несколько сообщений в секунду) и использовать регулярные выражения для удаления повторяющегося содержимого.
  2. Используйте собственный сценарий для взаимодействия с устройством JSON вывод (например, Groovy сделает взаимодействие JSON довольно простым)
    • Если вы сделаете это в пределах в контексте NiFi (через ExecuteScript или InvokeScriptedProcessor) у вас будет доступ к инфраструктуре NiFi, чтобы вы могли оценивать атрибуты и содержимое потокового файла, что упрощает задачу (будут атрибуты для начальной отметки времени и т. д. c. ).
    • Если вы сделаете это вне контекста NiFi (через ExecuteProcess или ExecuteStreamCommand), у вас не будет доступа к структуре NiFi (атрибуты и т. Д. c.), но вы можете лучше взаимодействовать с устройством напрямую.
...