В моем приложении всякий раз, когда кто-то вносит какие-либо изменения в какую-либо базу данных (через «Приложение БД»), я хочу, чтобы надлежащее уведомление было отправлено на топовый номер Kafka c. Однако я не хочу, чтобы клиент, использующий приложение БД, не работал, если приложению не удалось отправить уведомление в Kafka topi c.
Чтобы убедиться, что при каждом изменении БД генерируется хотя бы одно событие Я решил создать приложение Kafka Streams.
Оно будет прослушивать topi c, в котором должно происходить событие, и каждые 5 секунд:
- проверять, какие события
- сообщает приложению БД, какие события оно наблюдало в пределах временного окна, и получает в ответ набор пропущенных событий
- отправляет эти события в исходный файл Kafka topi c
Теперь мой вопрос: как я мог это сделать sh? Я подумал, что буду использовать WindowStore
для хранения наблюдаемых событий (что также хорошо решило бы проблему поздних событий). Однако я не знаю, как подключить логи c из пунктов выше в мой поток. Не могли бы вы помочь?
Что я пробовал:
public Topology createTopology() {
Duration windowSize = Duration.ofSeconds(5);
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.streams(
this.inputTopics, Consumed.with(this.keySerde, this.valueSerde, this.timestampExtractor, null)
).groupedByKey( // I'm not even sure why this needs to be done
Grouped.with(this.keySerde, this.valueSerde)
).windowedBy(
TimeWindows.of(windowSize)
.advancedBy(windowSize)
.grace(Duration.ofSeconds(1))
)
// ??? - getting missing events from DB App and sending them to input topic
}