Я хочу включить обработку ошибок Kafka по аннотации - PullRequest
0 голосов
/ 05 марта 2020

Я попытался включить обработку ошибок spring-kafka с помощью аннотации.

Поэтому я создал бин kafkaListenerErrorHandler.

@Bean
public KafkaListenerErrorHandler kafkaListenerErrorHandler(ErrorHandler errorHandler) {
    KafkaListenerErrorHandler kafkaListenerErrorHandler = (message, exception) -> {
        MessageHeaders headers = message.getHeaders();
        ConsumerRecord<String, EventV1> consumerRecord = new ConsumerRecord<>(
                headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class),
                headers.get(KafkaHeaders.OFFSET, Long.class),
                headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY, String.class),
                (EventV1) message.getPayload());
        errorHandler.handle(exception, Collections.singletonList(consumerRecord), headers.get(KafkaHeaders.CONSUMER, Consumer.class), messageListenerContainer);
        return null;
    };

    return kafkaListenerErrorHandler;
}

и связал его с KafkaListener на

@Autowired(required = false)
MessageListenerContainer messageListenerContainer;

@Autowired
private UserBO userBO;

@Bean
ErrorHandler errorHandler(KafkaTemplate kafkaTemplate) {
    return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 2));
}

@KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", errorHandler = "kafkaListenerErrorHandler")
public void listen(ConsumerRecord<String, EventV1> message) {

    log.info("Received Messasge: {}", message);
    try {
        EventV1 value = message.value();
        userBO.getUserById(value.getCustomerId());
    } catch (Exception e) {
        throw new ListenerExecutionFailedException(e.getLocalizedMessage(), e);
    }
}

Даже если кажется, что это работает, код кажется уродливым. kafkaListenerErrorHandler создает новый ConsumerRecord для перенаправления его в ErrorHandler, а «messageListenerContainer» имеет значение null, потому что я не выяснил, как получить его в моем контексте.

Должно быть или должно быть больше элегантный способ соединить ErrorHandler с KafkaListenerErrorHandler

Я также добавил обработчик ошибок десериализации в мою настройку.

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* Спасибо за любой совет.

1 Ответ

0 голосов
/ 05 марта 2020

Вам не нужен обработчик ошибок слушателя; Spring boot автоматически подключит errorHandler в контейнер слушателя, и он будет вызван оттуда, если слушатель выдает исключение.

Также то, что вы делаете, неправильно, так как обработчик ошибок будет искать только текущую запись и, вероятно, будет больше записей, выбранных в последнем опросе.

KafkaListenerErrorHandler s выше в стеке и используются для более сложной обработки ошибок.

Например, в В сценарии запрос / ответ (метод слушателя возвращает значение ответа) обработчик ошибок слушателя может вернуть ответ об ошибке отправителю сообщения запроса.

EDIT

Исключения могут быть классифицированы как «не повторяемые».

См. документацию .

Начиная с версии 2.3, SeekToCurrentErrorHandler считает некоторые исключения роковые, и повторные попытки пропускаются за такими исключениями; восстановитель вызывается при первом сбое. Исключения, которые по умолчанию считаются фатальными:

  • DeserializationException
  • MessageConversionException
  • MethodArgumentResolutionException
  • NoSuchMethodException
  • ClassCastException

, так как эти исключения вряд ли будут разрешены при повторной доставке.

Вы можете добавить больше типов исключений в категорию без повторов или полностью замените BinaryExceptionClassifier на свой настроенный классификатор. См. Javadoc для SeekToCurrentErrorHandler для получения дополнительной информации, а также для spring-retry BinaryExceptionClassifier.

. Вот пример, который добавляет IllegalArgumentException к исключениям, которые не повторяются:

@Bean
public SeekToCurrentErrorHandler errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer);
    handler.addNotRetryableException(IllegalArgumentException.class);
    return handler;
}
...