Как отследить, что изменилось между двумя событиями в теме? - PullRequest
0 голосов
/ 25 марта 2019

My STORE_TOPIC - это сжатая тема, которая содержит текущее состояние моих сущностей на основе событий, полученных из EVENTS_TOPIC (приложением Kafka Streams).Основная цель этого STORE_TOPIC заключается в том, чтобы быть загруженной как (глобальная) таблица KTable.

EVENTS_TOPIC                    |    STORE_TOPIC
                                |    
VALUE                           |     KEY      VALUE
{"entity":"A", "a":1, "b":2}    |     A        {"a":1, "b":2}
{"entity":"B", "c":3}           |     B        {"c":3}
{"entity":"A", "d":4}           |     A        {"a":1, "b":2, "d":4}
{"entity":"A", "a":0, "e":5}    |     A        {"a":0, "b":2, "d":4, "e":5}

Новый потребитель должен получать уведомления, когда происходят изменения в определенном подмножестве атрибутов.

В качестве примера, если мы решили отслеживать изменения атрибутов "a", "b" и "c", ожидаемый результат будет:

A        {"a":1, "b":2}
B        {"c":3}
(Nothing)
A        {"a":0, "b":2}

Чтобы сделать это,Я написал новое приложение Kafka Streams, которое:

  • Обрабатывает «локальное хранилище», которое содержит необходимые атрибуты для каждого ключа
  • Загрузка STORE_TOPIC в виде потока
  • Сравните входящее сообщение с содержимым "локального хранилища" (спасибо .transform())
  • Если обнаружено изменение, записывает входящее сообщение (только те атрибуты, которые мы должны отслеживать) в новоеOUT_TOPIC и добавьте / замените запись в «местном магазине»

Есть ли более простой и элегантный способ добиться этого?

Можете ли вы подтвердить, чтоесли я загружаю STORE_TOPIC напрямую как KTable: .table(STORE_TOPIC), я могу получить доступ только к текущему состоянию объекта,и нет ли способа получить доступ к предыдущей версии сущности и провести на ней сравнение?

...