My STORE_TOPIC
- это сжатая тема, которая содержит текущее состояние моих сущностей на основе событий, полученных из EVENTS_TOPIC
(приложением Kafka Streams).Основная цель этого STORE_TOPIC
заключается в том, чтобы быть загруженной как (глобальная) таблица KTable.
EVENTS_TOPIC | STORE_TOPIC
|
VALUE | KEY VALUE
{"entity":"A", "a":1, "b":2} | A {"a":1, "b":2}
{"entity":"B", "c":3} | B {"c":3}
{"entity":"A", "d":4} | A {"a":1, "b":2, "d":4}
{"entity":"A", "a":0, "e":5} | A {"a":0, "b":2, "d":4, "e":5}
Новый потребитель должен получать уведомления, когда происходят изменения в определенном подмножестве атрибутов.
В качестве примера, если мы решили отслеживать изменения атрибутов "a", "b" и "c", ожидаемый результат будет:
A {"a":1, "b":2}
B {"c":3}
(Nothing)
A {"a":0, "b":2}
Чтобы сделать это,Я написал новое приложение Kafka Streams, которое:
- Обрабатывает «локальное хранилище», которое содержит необходимые атрибуты для каждого ключа
- Загрузка
STORE_TOPIC
в виде потока - Сравните входящее сообщение с содержимым "локального хранилища" (спасибо
.transform()
) - Если обнаружено изменение, записывает входящее сообщение (только те атрибуты, которые мы должны отслеживать) в новое
OUT_TOPIC
и добавьте / замените запись в «местном магазине»
Есть ли более простой и элегантный способ добиться этого?
Можете ли вы подтвердить, чтоесли я загружаю STORE_TOPIC
напрямую как KTable: .table(STORE_TOPIC)
, я могу получить доступ только к текущему состоянию объекта,и нет ли способа получить доступ к предыдущей версии сущности и провести на ней сравнение?