Я использую семантику ровно один раз, предоставленную kafka. Поэтому мой продюсер пишет сообщение в транзакции. Пока мой продюсер отправлял сотое сообщение, прямо между send()
и commitTransaction()
; Я убил продюсера.
Я прочитал несколько последних незафиксированных сообщений в моей теме.
Consumer record offset - message number
0 - 1
2 - 2
196 - 99 <- Last committed message by producer
198 - 100 <- 100th message was sent but not committed
Теперь, когда я запускаю потребитель с read_committed
уровнем изоляции. Точно читает из 1-99 сообщений. Но для этого я прочитал всю тему. В конце концов, я собираюсь хранить миллионы сообщений в теме. Таким образом, чтение всей темы не является предпочтительным.
Кроме того, предположим, что потребитель опрашивает сообщения от брокера, и существует некоторая проблема связи с агентом kafka и потребителем. Последнее сообщение, прочитанное потребителем, скажем, смещение # 50. Это означает, что я не смог надежно определить последнее зафиксированное сообщение в теме.
Я использовал другие методы, т.е.
seekToEnd() - took me to offset#200
endOffsets() - took me to offset#200
Есть ли способ получить сообщение, которое было передано производителем Kafka надежно ? (В моем случае, Offset#196
)