Spring Kafka AckOnError - PullRequest
       0

Spring Kafka AckOnError

0 голосов
/ 18 февраля 2020

Я настроил SeekToErrorHandler с DeadLetterPublisheingRecoverer

 ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(primaryConsumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(true);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);               
    factory.setErrorHandler(new SeekToErrorHandler(new DeadLetterPublisheingRecoverer(kafkaTemplate()),3));

Когда из слушателя (или средства проверки) выдается исключение, после трех попыток сообщение публикуется в мертвой букве.

Проблема здесь в следующий раз, когда перезапускаю мое весеннее загрузочное приложение (или контейнер слушателя), то же сообщение снова доставляется слушателю, проходит всю последовательность и, наконец, попадает в мертвую букву. Есть ли способ избежать этого?

Я отключил автоматическую фиксацию и установил AckOnError (false) и AckMode (AckMode.RECORD);

В SeekToErrorHandler я обнаружил, что логика c вокруг SeekToUtil выдает исключение до тех пор, пока не будет выполнено настроенное число итераций и, наконец, не вызовет метод accept BiConsumer (публикация deadletter). Таким образом, контейнер должен зафиксировать запись на последнем этапе (при публикации в мертвую букву), верно? Я также просмотрел комментарий к методу ackOnError (boolean) в org.springframework.kafka.listener.ContainerProperties

Когда setAckOnError (true), я мог найти правильное поведение с тремя повторными попытками и, наконец, вызвать издателя мертвых писем. Сообщение не возвращается повторно при перезапуске контейнера слушателя

Версия Spring kafka - 2.2.6

1 Ответ

0 голосов
/ 18 февраля 2020

В 2.3 мы добавили ackAfterHandle; с значением по умолчанию, равным true для SeekToCurrentErrorHandler.

@Override
public boolean isAckAfterHandle() {
    return this.ackAfterHandle;
}

/**
 * Set to false to tell the container to NOT commit the offset for a recovered record.
 * @param ackAfterHandle false to suppress committing the offset.
 * @since 2.3.2
 */
public void setAckAfterHandle(boolean ackAfterHandle) {
    this.ackAfterHandle = ackAfterHandle;
}

В версии 2.4 по умолчанию используется значение true для всех обработчиков ошибок.

https://github.com/spring-projects/spring-kafka/issues/1273

...