Обогащение данных с использованием потоков kafka, KStream-GlobalKtable Join - PullRequest
0 голосов
/ 11 марта 2019

У меня есть сценарий, в котором я создаю Kafka KStream, читая данные из темы kafka.Записи KStream имеют значение key = null, значение = {объект json}, например,

null: { "ID":"1", "name":"XDFER"}
null: { "ID":"1", "name":"TRAFD"}

Реальные имена хранятся в GlobalKTable как:

XDFER : "john"
TRAFD : "albert"

Я хочу выполнить обогащение данныхитоговый результат таков:

null: { "ID":"1", "name":"john"}
null: { "ID":"1", "name":"albert"}

Я начал читать о приложениях Kafka Stream, в каждом уроке / примере обогащение данных выполняется путем сравнения ключей из KStream и GlobalKTable.В моем случае мне нужно сравнить элемент из значения записи KStream с ключом в GlobalKTable.Любые идеи или примеры, как этого можно достичь.

1 Ответ

2 голосов
/ 11 марта 2019

Входные записи для потока с ключом null или значением null игнорируются и не инициируют соединение.

Следовательно, вам необходимо повторно указать поток, чтобы имя можно было использовать в качестве ключа.

stream.selectKey(v-> v.get("name"))

После повторного ввода вы можете присоединиться к потоку с помощью GlobalKTable.

Подробное поведение вы можете прочитать здесь:

https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#kstream-globalktable-join

...