Kafka Stream воспроизводит непроизведенные события - PullRequest
0 голосов
/ 28 апреля 2020

В моем приложении всякий раз, когда кто-то вносит какие-либо изменения в какую-либо базу данных (через «Приложение БД»), я хочу, чтобы надлежащее уведомление было отправлено на топовый номер Kafka c. Однако я не хочу, чтобы клиент, использующий приложение БД, не работал, если приложению не удалось отправить уведомление в Kafka topi c.

Чтобы убедиться, что при каждом изменении БД генерируется хотя бы одно событие Я решил создать приложение Kafka Streams.

Оно будет прослушивать topi c, в котором должно происходить событие, и каждые 5 секунд:

  • проверять, какие события
  • сообщает приложению БД, какие события оно наблюдало в пределах временного окна, и получает в ответ набор пропущенных событий
  • отправляет эти события в исходный файл Kafka topi c

Теперь мой вопрос: как я мог это сделать sh? Я подумал, что буду использовать WindowStore для хранения наблюдаемых событий (что также хорошо решило бы проблему поздних событий). Однако я не знаю, как подключить логи c из пунктов выше в мой поток. Не могли бы вы помочь?

Что я пробовал:

public Topology createTopology() {
    Duration windowSize = Duration.ofSeconds(5);

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    streamsBuilder.streams(
        this.inputTopics, Consumed.with(this.keySerde, this.valueSerde, this.timestampExtractor, null)
    ).groupedByKey(  // I'm not even sure why this needs to be done
        Grouped.with(this.keySerde, this.valueSerde)
    ).windowedBy(
        TimeWindows.of(windowSize)
            .advancedBy(windowSize)
            .grace(Duration.ofSeconds(1))
    )
    // ??? - getting missing events from DB App and sending them to input topic
}
...