Зафиксировать смещение к Кафке в реакторе-кафке после того, как несколько попыток обработки потерпели неудачу - PullRequest
0 голосов
/ 09 апреля 2020

Есть топик Кафки c, на который приходят сообщения. Мне нужно прочитать сообщение, обработать его и перейти к следующему сообщению. Обработка сообщений может завершиться сбоем, и если это произойдет, обработку придется повторить несколько раз (скажем, 10 раз), прежде чем я смогу перейти к следующему сообщению. Если обработка завершается 10 раз, сообщение необходимо отбросить, и мы должны продолжить со следующего сообщения.

Мы используем reactor-kafka, вся обработка должна быть реактивной.

Здесь Вот как я пытался решить это:

Flux.defer(receiver::receive)
        .concatMap(this::processRecord)
        .retryBackoff(10, ofMillis(500))
        .concatMap(record -> record.receiverOffset().commit())
        .subscribe();

(здесь receiver является KafkaReceiver<String, String>).

Это работает для случая без каких-либо исключений, и если есть исключение , processRecord() повторяется 10 раз. Проблема здесь заключается в том, что если после 10 разрешенных попыток он все еще не работает, смещение не фиксируется (конечно), поэтому в следующий раз такое же смещение будет считано из Kafka, так что, по сути, обработка застревает навсегда с ошибкой смещения .

Я попытался реализовать следующую очевидную идею: если исключение «проходит дальше», чем оператор retryBackoff(), фиксирует текущее смещение. Чтобы зафиксировать смещение, нам нужен ReceiverRecord, поэтому я добавил обтекание исключения в ExceptionWithRecord вместе с текущей записью:

// in processRecord()
.onErrorMap(ex -> new ExceptionWithRecord(record, ex))

и

Flux.defer(receiver::receive)
        .concatMap(this::processRecord)
        .retryBackoff(10, ofMillis(500))
        .concatMap(record -> record.receiverOffset().commit())
        .onErrorResume(this::extractRecordAndMaybeCommit)
        .subscribe();

extractRecordAndMaybeCommit() извлекает ReceiverRecord из данного исключения и фиксирует его:

return record.receiverOffset().commit();

Этот метод передачи записи и последующей ее фиксации, если повторные попытки исчерпаны, работает, и вызывается метод .commit(). но это не имеет никакого эффекта.

Оказывается, что, поскольку любое исключение входит в реактивный конвейер выше, вызывается DefaultKafkaReceiver.dispose(), поэтому любая последующая попытка фиксации игнорируется. Таким образом, оказывается, что просто невозможно зафиксировать смещение, как только издатели «увидят» любое исключение.

Как можно реализовать поведение «зафиксировать после N ошибок» при использовании reactor-kafka

1 Ответ

0 голосов
/ 16 апреля 2020

Мне не удалось найти «правильный» и простой способ решения задачи, поэтому мне пришлось прибегнуть к грубой силе состояния и побочных эффектов: подсчитать повторные попытки вручную и прекратить повторные попытки, когда количество попыток превысит предел .

Вот счетчик:

public class RetryCounter<K, V> {
    private final Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>();
    private final Map<TopicPartition, Long> attemptNumbers = new ConcurrentHashMap<>();

    public void onRecord(ReceiverRecord<K, V> record) {
        TopicPartition partition = record.receiverOffset().topicPartition();
        Long currentOffset = offsets.get(partition);

        final long nextAttempt;
        if (currentOffset == null || record.receiverOffset().offset() != currentOffset) {
            nextAttempt = 0;
        } else {
            nextAttempt = attemptNumbers.getOrDefault(partition, 0L) + 1;
        }

        // it's ok to update maps non-atomically because updates and queries are never run concurrently
        offsets.put(partition, record.receiverOffset().offset());
        attemptNumbers.put(partition, nextAttempt);
    }

    public long currentAttemptFor(ReceiverRecord<K, V> record) {
        return attemptNumbers.getOrDefault(record.receiverOffset().topicPartition(), 0L);
    }
}

Затем мы подсчитываем повторы каждого смещения (независимо для каждого раздела) и прекращаем обработку при превышении количества повторов:

RetryCounter<String, String> counter = new RetryCounter<>();
Flux.defer(receiver::receive)
        .doOnNext(counter::onRecord)
        .concatMap(record -> {
            if (counter.currentAttemptFor(record) >= 10) {
                // we tried 10 times, it's 11th, so let's log the error and return
                // to avoid calling processRecord() so that there is no error
                // in the reactive pipeline and we are able to commit
                logFinalError(record);
                return record;
            } else {
                return processRecord(record).thenReturn(record);
            }
        })
        .retryBackoff(Long.MAX_VALUE, ofMillis(500))
        .concatMap(record -> record.receiverOffset().commit())
        .subscribe();
...