Как правильно анализировать вложенные записи Avro в NiFi? - PullRequest
0 голосов
/ 21 мая 2018

У меня есть входящие записи Avro, которые примерно соответствуют формату ниже.Я могу читать их и конвертировать в существующие потоки NiFi.Однако недавнее изменение требует, чтобы я прочитал из этих файлов и проанализировал вложенную запись, employers в этом примере.Я прочитал сообщение в блоге Apache NiFi, Запись данных с помощью NiFi , но не смог выяснить, как заставить AvroRecordReader анализировать вложенные записи.

{
  "name": "recordFormatName",
  "namespace": "nifi.examples",
  "type": "record",
  "fields": [
    { "name": "id", "type": "int" },
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" },
    { "name": "email", "type": "string" },
    { "name": "gender", "type": "string" },
    { "name": "employers",
        "type": "record",
        "fields": [
            {"name": "company", "type": "string"},
            {"name": "guid", "type": "string"},
            {"name": "streetaddress", "type": "string"},
            {"name": "city", "type": "string"}
        ]}
  ]
}

Я надеюсь достичь потока чтения employers записей для каждой записи recordFormatName и использования процессора PutDatabaseRecord для отслеживания увиденных значений employers.Текущий план состоит в том, чтобы вставить записи в базу данных MySQL.Как предлагается в ответе ниже, я планирую использовать PartitionRecord для сортировки записей на основе значения в подзаписи employers.Мне не нужны детали верхнего уровня для этого конкретного потока.

Я попытался выполнить синтаксический анализ AvroRecordReader, но не могу понять, как указать вложенные записи.Может ли это быть выполнено только с помощью AvroRecordReader или выполняется предварительная обработка, скажем, сначала необходимо выполнить преобразование JOLT?

РЕДАКТИРОВАТЬ: добавлены дополнительные сведения о базе данных после получения ответа.

1 Ответ

0 голосов
/ 21 мая 2018

Какова ваша целевая БД и как выглядит ваша целевая таблица?PutDatabaseRecord, возможно, не сможет обрабатывать вложенные записи, если ваша БД, драйвер и целевая таблица не поддерживают их.

В качестве альтернативы вам может понадобиться использовать UpdateRecord , чтобы сгладить объект "работодатели" в поля вверхний уровень записи.Это ручной процесс (до реализации NIFI-4398 ), но у вас есть только 4 поля.После выравнивания записей вы можете использовать PartitionRecord , чтобы получить все записи с определенным значением, например, для Employers.company.Файлы исходящего потока из PartitionRecord технически будут представлять собой отдельные значения для полей раздела.Я не уверен, что вы делаете с различными ценностями, но если вы можете уточнить, я был бы рад помочь.

...