Агрегировать сообщения, используя несколько полей в значении сообщения - PullRequest
2 голосов
/ 27 мая 2019

У меня есть одна тема Kafka с несколькими пользовательскими информационными событиями для нескольких разных пользователей.Я пытаюсь выяснить, как объединить их вместе, используя несколько полей из значения.

Например:

Входная тема:

1:{"SSN":"123456"}
2:{"twitterHandle":"elvis"}
3:{"SSN":"123456","twitterHandle":"elvis","accountNum": "111111"}
4:{"SSN":"123456"}
5:{"SSN":"000000"}
6:{"twitterHandle":"foo"}
7:{"SSN":"000000","twitterHandle":"foo"}
8:{"SSN":"000000"}

Я хочу выходную тему (агрегировано):

{"SSN":"123456","twitterHandle":"elvis","accountNum": "111111"}
{"SSN":"000000","twitterHandle":"foo"}

Как этого добиться с помощью Kafka Streams?Могу ли я создать KStream из входной темы и преобразовать его в KTable для получения выходной темы?

Обновление: в теме содержатся события от нескольких разных пользователей.Идентификаторы пользователя (SSN, twitterHandle) не являются фиксированными.Там могут быть другие идентификаторы для пользователей

1 Ответ

0 голосов
/ 29 мая 2019

Если вы слепо хотите удалить сообщения 1 и 2 и оставить сообщение 3, вы можете использовать перехватчик потребителя.

Перехватчик будет слепо анализировать сообщение json, проверять, присутствуют ли в сообщении оба ключа (и не нулевые), а затем успешно отправлять сообщение вперед, иначе не будет.В этом случае вам не нужно приложение Kstream.Только один класс-перехватчик, который нужно использовать при использовании сообщения.

Однако, если вы хотите просто прошить 1 и 2 без какого-либо общего ключа между ними, я не думаю, что это возможно, потому что мы не знаем, какойНужно объединить SSN с каким твиттером.

Дайте мне знать, могу ли я чем-нибудь помочь.

...