Изменить / обновить имя поля в свойстве текста схемы NiFi через различные параллельные потоки - PullRequest
0 голосов
/ 11 декабря 2018

У меня есть несколько идентичных параллельных потоков (как показано на скриншоте).У меня есть convertRecord в каждом из идентичных потоков, и в устройстве чтения записей я использовал «Свойство поля текста схемы» в качестве стратегии доступа и указал «Текст схемы».Например:

   {
 "type": "record",

 "name": "AVLRecord0",

 "fields" : [

    {"name": "TimeOfDay", "type": "string", "logicalType":"timestamp-millis"},
    {"name":"Field1", "type": "double"},
    {"name":"Field2", "type": "double"},
    {"name":"Field3", "type": "double"},
    {"name": "Filename", "type": "string"}
]

}

Допустим, вышеприведенная схема, которую я использовал в различных параллельных потоках ConvertRecord, и теперь я хочу обновить одно имя поля из Поле до Field_Name , так что есть ли способ?Я могу сделать это за один раз по всей записи преобразования текста схемы?

Если я хочу изменить / обновить одно из полей в тексте схемы, нужно ли мне изменять / обновлять имя поля в каждом процессоре вручную?Или есть глобальный способ изменить имя поля во всем параллельном потоке, который у меня есть?

Можно ли как-нибудь обновить текст схемы на разных процессорах за один раз?

Любая помощь высоко ценится!Спасибо

enter image description here

1 Ответ

0 голосов
/ 11 декабря 2018

Поскольку вы используете Schema Text Field Property, вам нужно вручную изменить весь процессор ConvertRecord .

Попробуйте использовать такой подход:

В процессоре ConvertRecord используйте Стратегия доступа к схеме как

Use Schema Name Property

Затем настройте AvroSchemaRegistry и определите свою схему, добавив новое свойство

enter image description here

Я добавил sch в качестве имени схемы и определил схему avro.

После процессора GetFile используйте процессор UpdateAttribute и добавьтеschema.name атрибут (например, со значением sch) для файла потока.

  • Теперь в службе контроллера считывателя используйте стратегию доступа к схеме в качестве Use Schema Name Property and Schema Registry as AvroSchemaRegistry`, которая уже имеетнастроить.enter image description here

Следуя этому пути, мы не определяем схему на всех ConvertRecordвместо процессоров мы ссылаемся на ту же схему , которая определена в AvroSchemaRegistry, на случай, если вы захотите изменить одно имя поля, можно легко зайти в реестр и изменить значение.

Поток:

1.GetFile
2.UpdateAttribute //add schema.name attribute
3.ConvertRecord //define/use AvroSchemaRegistry and access strategy as schemaname property
..other processors

См. эту ссылку для получения более подробной информации относительно определения / использования AvroSchemaRegistry.

...