NiFi: лучший способ вставить данные из Кафки в Кассандру? - PullRequest
0 голосов
/ 12 декабря 2018

Я потратил 2 дня на свои исследования, и теперь мне нужна ваша помощь, ребята.Заранее спасибо.

У меня есть следующий поток: 1) ConsumeKafka (сообщения в формате JSON) 2) EvaluateJsonPath 3) UpdateAttributes 4) AttributesToJson

Весь вышеуказанный поток работает, но следующий остаток потока не работает:5) PutCassandraRecord (мне нужна помощь по настройке этого процессора. Я знаю свой сервер Cassandra, порт, пространство ключей, имя таблицы, устройство чтения записей - JsonPathReader).Что-то еще???6) добавлен сервис контроллера - JsonPathReader (здесь мне нужна помощь по настройке этого ридера).7) Я получаю исключение, как прикрепленный файл ниже.Где и как мне получить или настроить Schema Registry?enter image description here

Я проверил этот вопрос и ответ: Apache Nifi / Cassandra - как загрузить CSV в таблицу Cassandra

Ребята, если мой потокнеправильно, пожалуйста, поправьте меня.Спасибо.

С уважением, Йешвант

1 Ответ

0 голосов
/ 12 декабря 2018

Есть multiple ways, которые мы можем настроить Устройство чтения / записи записей Служба контроллера

Я попытаюсь объяснить следующие две Стратегия доступа к схеме

  • Использовать свойство «Имя схемы»
  • Использовать свойство «Текст схемы»

Использовать свойство SchemaText:

В этой стратегии доступа процессор будет искать атрибут avro.schema в атрибутах VariableRegistry / FlowfileAttributes (или), которые мы можем дать schema in the property value.

Пример:

Я указал значение свойства текста схемы в качестве моей авро-схемы enter image description here

Использовать свойство «Имя схемы»:

enter image description here

В этой стратегии процессор проверяет Schema Name значение свойства ${valor.vaengine} (это имя атрибута), поэтому нам нужно иметь значение для этого атрибута с потоком.

Тогда служба контроллера использует ${valor.vaengine} значение использует соответствующую схему из AvroSchemaRegistry, которая использовалась этим контроллеромservice.

В вашем случае ваш потоковый файл не имеет атрибута ${valor.vaengine}, чтобы добавить этот атрибут в потоковый файл, используйте UpdateAttribute процессор, добавьте новое свойство как

valor.vaengine

<schema_name_in_avroschemaregistry>

Используйте этот шаблон для получения более подробной информации о настройке / использовании Record Reader/writer служб контроллера


Выиспользуя JsonPathReader службу контроллера для этой службы контроллера

нам нужно добавить по крайней мере одно пользовательское свойство , чтобы включить службу контроллера, например, имя свойства как id значение как $.id

...