Как опубликовать sh Spring Kafka DLQ в версии 2.5.4 - PullRequest
0 голосов
/ 06 августа 2020

нужна ваша помощь и руководство по этому поводу.

Я использовал spring-kafka версии 2.2.X в моем текущем проекте.

Обработка ошибок, которую я создал, выглядит следующим образом:

@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
    return factory;
}

public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}

А затем я обновил все мои версии зависимостей проекта, такие как spring -boot и spring-kafka, до последней версии: 2.5.4 RELEASE

Я обнаружил что некоторые из методов устарели и изменены.

SeekToCurrentErrorHandler

SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
    // recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));

Мой вопрос в том, как создать DLQ с этими конфигурациями:

EDITED

@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(consumerConcurrencyCount);
    factory.setErrorHandler(errorHandler());
    return factory;
}

public SeekToCurrentErrorHandler errorHandler() {
    return new SeekToCurrentErrorHandler(
            deadLetterPublishingRecoverer(),
            new FixedBackOff(0L, 2L)
    );
}

public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(
            getEventKafkaTemplate(),
            (record, ex) -> {
                if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
                    return new TopicPartition("topic-undelivered", -1);
                }

                return new TopicPartition("topic-fail", -1);
            });
}

public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}

Эти конфигурации работают, спасибо Гэри!

Заранее спасибо

1 Ответ

0 голосов
/ 06 августа 2020

Непонятно, что вы имеете в виду под

Проблема в том, что в документации все еще используется старый метод, который не рекомендуется для версии 2.5.X

1007 * - это интерфейс, который реализует KafkaTemplate; единственное изменение, которое вам нужно сделать, это изменить maxAttempts на BackOff ...

@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), new FixedBackOff(0, 2L));
    return factory;
}

public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
    return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
            (record, ex) -> new TopicPartition("topic-undelivered", -1));
}
...