Я пытаюсь зафиксировать смещения из задания потоковой передачи Spark в Kafka, используя следующее:
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
как я понял из этого вопроса:
Spark DStream от Kafka всегда начинается с начала
И это работает нормально, смещения фиксируются. Однако проблема заключается в том, что это асинхронно, что означает, что даже после того, как еще две коммиты смещения были отправлены по линии, Кафка все еще может удерживать смещение до двух коммитов ранее. Если в этот момент происходит сбой потребителя, и я возвращаю его обратно, он начинает читать сообщения, которые уже были обработаны.
Теперь из других источников, например, раздел комментариев здесь:
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
Я понял, что нет способа синхронно фиксировать смещения из потокового задания Spark (хотя есть один, если я использую потоки Кафки). Люди скорее предлагают сохранить смещения в базах данных, где вы сохраняете конечный результат ваших вычислений в потоке.
Теперь мой вопрос таков:
Если я действительно сохраню текущее смещение чтения в своей базе данных, как мне начать чтение потока именно с этого смещения в следующий раз?