Как обработать запись в Kafka на основе результата обработки другой записи? - PullRequest
0 голосов
/ 29 февраля 2020

У меня есть класс @KafkaListener, который прослушивает определенный топи c и использует записи, которые содержат либо объект Person, либо объект Phone (и только один из них). Каждый Phone имеет идентификатор ссылки / корреляции с соответствующим Person. Класс слушателя выполняет определенные проверки, которые определяют c для полученного типа, сохраняет объект в базе данных и выдает ответ об успешном / неудачном переносе обратно в Kafka, который используется другим сервисом.

Так что Person может быть успешно передано без какого-либо соответствующего Phone, но Phone передача должна быть успешной, только если соответствующая Person передача прошла успешно. Я не могу понять, как реализовать эту «синхронизацию», потому что Person s и Phone s попадают в Kafka независимо как отдельные записи, и не гарантируется, что Person, соответствующий конкретному Phone, будет обрабатываться до Phone.

Можно ли вообще иметь такую ​​синхронизацию с учетом текущей архитектуры, или мне следует перепроектировать производителя и отправить пару Person / Phone в качестве отдельного типа?

Спасибо.

1 Ответ

1 голос
/ 01 марта 2020

Непонятно, как вы используете один и тот же сериализатор для разных типов объектов, но вам, вероятно, следует создать отдельные темы и / или разделить вашу текущую тему на две (см. Kafka Streams API)

Я предполагаю, что там меньше людей, чем телефонов, и в этом случае вы можете создать KTable из списка людей topi c, тогда, когда вы получите записи телефонных разговоров, вы можете выполнить левое соединение или поиск по этой таблице для некоторого идентификатора человека

Другие решения могут включать использование Kafka Connect для выгрузки записей в систему, где вы можете выполнить объединение

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...