Как указать ключ для производителя кафки в apache nifi? - PullRequest
0 голосов
/ 20 сентября 2018

У меня есть простой конвейер, использующий apache nifi, и я хочу опубликовать некоторые сообщения в теме kafka, используя существующий процессор kafka puplisher.enter image description here

Проблема в том, как указать ключ kafka с помощью языка выражений apache nifi?Мне надоело что-то вроде ${message:jsonPath('$.key')}, но, конечно, я получил ошибку, потому что объект message не существует.enter image description here

Я также пытался использовать объект filename, который похож на имя объекта по умолчанию для входных сообщений, но это не помогло


Используя другой процессор издателя kafka, это возможно, установив свойство message key field, но как насчет процессора PublishKafka?

1 Ответ

0 голосов
/ 20 сентября 2018

Язык выражений NiFi может ссылаться только на атрибуты файла потока и не может напрямую ссылаться на содержимое (это делается специально).

Так что если вы хотите использовать значение поля из вашего документа json в качествеключом, затем вам нужно сначала использовать другой процессор, такой как EvaluateJsonPath, чтобы извлечь значение этого поля в атрибут файла потока.

Допустим, у вас есть поле "foo" в вашем документе json, вы можете использовать EvaluateJsonPath сназначение для установки «атрибутов файла потока», а затем добавьте динамическое свойство, например:

foo = $ .foo

Затем в PublishKafka установите для свойства ключа значение $ {foo}.

Имейте в виду, что это имеет смысл, только если у вас есть один документ json на файл потока, в противном случае, если у вас есть несколько, тогда неясно, какой это ключ, поскольку у вас может быть только один атрибут "foo" для файла потока, номного полей "foo" в содержимом файла потока.

...