У меня есть входящие записи Avro, которые примерно соответствуют формату ниже.Я могу читать их и конвертировать в существующие потоки NiFi.Однако недавнее изменение требует, чтобы я прочитал из этих файлов и проанализировал вложенную запись, employers
в этом примере.Я прочитал сообщение в блоге Apache NiFi, Запись данных с помощью NiFi , но не смог выяснить, как заставить AvroRecordReader анализировать вложенные записи.
{
"name": "recordFormatName",
"namespace": "nifi.examples",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "gender", "type": "string" },
{ "name": "employers",
"type": "record",
"fields": [
{"name": "company", "type": "string"},
{"name": "guid", "type": "string"},
{"name": "streetaddress", "type": "string"},
{"name": "city", "type": "string"}
]}
]
}
Я надеюсь достичь потока чтения employers
записей для каждой записи recordFormatName
и использования процессора PutDatabaseRecord для отслеживания увиденных значений employers
.Текущий план состоит в том, чтобы вставить записи в базу данных MySQL.Как предлагается в ответе ниже, я планирую использовать PartitionRecord для сортировки записей на основе значения в подзаписи employers
.Мне не нужны детали верхнего уровня для этого конкретного потока.
Я попытался выполнить синтаксический анализ AvroRecordReader, но не могу понять, как указать вложенные записи.Может ли это быть выполнено только с помощью AvroRecordReader или выполняется предварительная обработка, скажем, сначала необходимо выполнить преобразование JOLT?
РЕДАКТИРОВАТЬ: добавлены дополнительные сведения о базе данных после получения ответа.