при использовании @StreamListener настройки KafkaListenerContainerFactory отражаются в сгенерированном KafkaMessageListenerContainer? - PullRequest
1 голос
/ 07 апреля 2020

Я использую spring-cloud-stream с kafka binder для получения сообщения от kafka. Приложение в основном потребляет сообщения от kafka и обновляет базу данных.

Есть сценарий ios, когда БД не работает (что может длиться часами) или другие временные технические проблемы. Поскольку в этих сценариях ios нет смысла повторять сообщение в течение ограниченного периода времени и затем перемещать его в DLQ, я пытаюсь достичь бесконечного числа попыток, когда мы получаем исключения определенного типа (например, DBHostNotAvaialableException)

Чтобы добиться этого, я попробовал 2 подхода (сталкиваясь с проблемами в обоих подходах) -

  1. В этом подходе пробовал установить обработчик ошибок для свойств контейнера при настройке bean-компонента ConcurrentKafkaListenerContainerFactory но обработчик ошибок не запускается вообще. При отладке потока я понял, что в создаваемом KafkaMessageListenerContainer поле errorHandler имеет значение null, следовательно, они используют LoggingErrorHandler по умолчанию. Ниже приведены мои конфигурации bean-компонентов фабрики контейнеров - метод @StreamListener для этого подхода такой же, как и для второго подхода, за исключением поиска по потребителю.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> 
     kafkaListenerContainerFactory(ConsumerFactory<String, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(kafkaConsumerFactory);
        factory.getContainerProperties().setAckOnError(false);
        ContainerProperties containerProperties = factory.getContainerProperties();
         // even tried a custom implementation of RemainingRecordsErrorHandler but call never went in to the implementation
        factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
        return factory;
    }
    

Я что-то упустил при настройке фабричного компонента или этот компонент актуален только для @KafkaListener, а не @StreamListener ??

Второй альтернативой была попытка достичь этого с помощью ручного подтверждения и поиска. Внутри метода @StreamListener, получающего Подтверждение и Потребителя из заголовков, в случае получения повторяющегося исключения, я делаю определенное количество попыток, используя retrytemplate и когда те исчерпаны, я запускаю consumer.seek(). Пример кода ниже -

@StreamListener(MySink.INPUT)
public void processInput(Message<String> msg) {

MessageHeaders msgHeaders = msg.getHeaders();
Acknowledgment ack = msgHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
Consumer<?,?> consumer = msgHeaders.get(KafkaHeaders.CONSUMER, Consumer.class);
Integer partition = msgHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
String topicName = msgHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
Long offset = msgHeaders.get(KafkaHeaders.OFFSET, Long.class);


try {
  retryTemplate.execute(
            context -> {
             // this is a sample service call to update database which might throw retryable exceptions like DBHostNotAvaialableException
                consumeMessage(msg.getPayload());
                return null;
            }
    );
}
catch (DBHostNotAvaialableException ex) {
  // once retries as per retrytemplate are  exhausted do a seek

    consumer.seek(new TopicPartition(topicName, partition), offset);

}
catch (Exception ex) {
  // if some other exception just log and put in dlq based on enableDlq property
    logger.warn("some other business exception hence putting in dlq ");
    throw ex;
}

if (ack != null) {
    ack.acknowledge();
}

}

Проблема с этим подходом - так как я выполняю customer.seek (), хотя может быть ожидающие записи из последнего опроса, которые могут быть обработаны и зафиксированы, если БД появится в течение этого периода (следовательно, не в порядке). Есть ли способ очистить эти записи во время выполнения поиска?

PS - мы в настоящее время находимся в версии 2.0.3.RELEASE для весенней загрузки и Finchley.RELEASE или для облачных зависимостей Spring (следовательно, не можем использовать такие функции, как отрицательные либо подтверждение, и обновление невозможно в данный момент).

1 Ответ

0 голосов
/ 07 апреля 2020

Spring Cloud Stream не использует фабрику контейнеров. Я уже объяснил вам, что в этот ответ .

Версия 2.1 представила ListenerContainerCustomizer, и если вы добавите бин такого типа, он будет вызван после создания контейнера.

Spring Boot 2.0 прекратил работу в течение года go и больше не поддерживается.

Ответ, о котором я вам говорил, показывает, как вы можете использовать отражение для добавления обработчика ошибок.

Выполнение поиска в слушателе будет работать только при наличии max.poll.records=1.

...