Получить последнее зафиксированное сообщение в разделе kafka - PullRequest
3 голосов
/ 25 марта 2019

Я использую семантику ровно один раз, предоставленную kafka. Поэтому мой продюсер пишет сообщение в транзакции. Пока мой продюсер отправлял сотое сообщение, прямо между send() и commitTransaction(); Я убил продюсера.

Я прочитал несколько последних незафиксированных сообщений в моей теме.

Consumer record offset - message number

0                     - 1
2                     - 2
196                   - 99  <- Last committed message by producer
198                   - 100 <- 100th message was sent but not committed

Теперь, когда я запускаю потребитель с read_committed уровнем изоляции. Точно читает из 1-99 сообщений. Но для этого я прочитал всю тему. В конце концов, я собираюсь хранить миллионы сообщений в теме. Таким образом, чтение всей темы не является предпочтительным.

Кроме того, предположим, что потребитель опрашивает сообщения от брокера, и существует некоторая проблема связи с агентом kafka и потребителем. Последнее сообщение, прочитанное потребителем, скажем, смещение # 50. Это означает, что я не смог надежно определить последнее зафиксированное сообщение в теме.

Я использовал другие методы, т.е.

seekToEnd() - took me to offset#200
endOffsets() - took me to offset#200

Есть ли способ получить сообщение, которое было передано производителем Kafka надежно ? (В моем случае, Offset#196)

...