Есть топик Кафки c, на который приходят сообщения. Мне нужно прочитать сообщение, обработать его и перейти к следующему сообщению. Обработка сообщений может завершиться сбоем, и если это произойдет, обработку придется повторить несколько раз (скажем, 10 раз), прежде чем я смогу перейти к следующему сообщению. Если обработка завершается 10 раз, сообщение необходимо отбросить, и мы должны продолжить со следующего сообщения.
Мы используем reactor-kafka
, вся обработка должна быть реактивной.
Здесь Вот как я пытался решить это:
Flux.defer(receiver::receive)
.concatMap(this::processRecord)
.retryBackoff(10, ofMillis(500))
.concatMap(record -> record.receiverOffset().commit())
.subscribe();
(здесь receiver
является KafkaReceiver<String, String>
).
Это работает для случая без каких-либо исключений, и если есть исключение , processRecord()
повторяется 10 раз. Проблема здесь заключается в том, что если после 10 разрешенных попыток он все еще не работает, смещение не фиксируется (конечно), поэтому в следующий раз такое же смещение будет считано из Kafka, так что, по сути, обработка застревает навсегда с ошибкой смещения .
Я попытался реализовать следующую очевидную идею: если исключение «проходит дальше», чем оператор retryBackoff()
, фиксирует текущее смещение. Чтобы зафиксировать смещение, нам нужен ReceiverRecord
, поэтому я добавил обтекание исключения в ExceptionWithRecord
вместе с текущей записью:
// in processRecord()
.onErrorMap(ex -> new ExceptionWithRecord(record, ex))
и
Flux.defer(receiver::receive)
.concatMap(this::processRecord)
.retryBackoff(10, ofMillis(500))
.concatMap(record -> record.receiverOffset().commit())
.onErrorResume(this::extractRecordAndMaybeCommit)
.subscribe();
extractRecordAndMaybeCommit()
извлекает ReceiverRecord
из данного исключения и фиксирует его:
return record.receiverOffset().commit();
Этот метод передачи записи и последующей ее фиксации, если повторные попытки исчерпаны, работает, и вызывается метод .commit()
. но это не имеет никакого эффекта.
Оказывается, что, поскольку любое исключение входит в реактивный конвейер выше, вызывается DefaultKafkaReceiver.dispose()
, поэтому любая последующая попытка фиксации игнорируется. Таким образом, оказывается, что просто невозможно зафиксировать смещение, как только издатели «увидят» любое исключение.
Как можно реализовать поведение «зафиксировать после N ошибок» при использовании reactor-kafka