Я использую spring-cloud-stream с kafka binder для получения сообщения от kafka. Приложение в основном потребляет сообщения от kafka и обновляет базу данных.
Есть сценарий ios, когда БД не работает (что может длиться часами) или другие временные технические проблемы. Поскольку в этих сценариях ios нет смысла повторять сообщение в течение ограниченного периода времени и затем перемещать его в DLQ, я пытаюсь достичь бесконечного числа попыток, когда мы получаем исключения определенного типа (например, DBHostNotAvaialableException)
Чтобы добиться этого, я попробовал 2 подхода (сталкиваясь с проблемами в обоих подходах) -
В этом подходе пробовал установить обработчик ошибок для свойств контейнера при настройке bean-компонента ConcurrentKafkaListenerContainerFactory но обработчик ошибок не запускается вообще. При отладке потока я понял, что в создаваемом KafkaMessageListenerContainer поле errorHandler имеет значение null, следовательно, они используют LoggingErrorHandler по умолчанию. Ниже приведены мои конфигурации bean-компонентов фабрики контейнеров - метод @StreamListener для этого подхода такой же, как и для второго подхода, за исключением поиска по потребителю.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory(ConsumerFactory<String, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.getContainerProperties().setAckOnError(false);
ContainerProperties containerProperties = factory.getContainerProperties();
// even tried a custom implementation of RemainingRecordsErrorHandler but call never went in to the implementation
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
Я что-то упустил при настройке фабричного компонента или этот компонент актуален только для @KafkaListener, а не @StreamListener ??
Второй альтернативой была попытка достичь этого с помощью ручного подтверждения и поиска. Внутри метода @StreamListener, получающего Подтверждение и Потребителя из заголовков, в случае получения повторяющегося исключения, я делаю определенное количество попыток, используя retrytemplate и когда те исчерпаны, я запускаю consumer.seek()
. Пример кода ниже -
@StreamListener(MySink.INPUT)
public void processInput(Message<String> msg) {
MessageHeaders msgHeaders = msg.getHeaders();
Acknowledgment ack = msgHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
Consumer<?,?> consumer = msgHeaders.get(KafkaHeaders.CONSUMER, Consumer.class);
Integer partition = msgHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
String topicName = msgHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
Long offset = msgHeaders.get(KafkaHeaders.OFFSET, Long.class);
try {
retryTemplate.execute(
context -> {
// this is a sample service call to update database which might throw retryable exceptions like DBHostNotAvaialableException
consumeMessage(msg.getPayload());
return null;
}
);
}
catch (DBHostNotAvaialableException ex) {
// once retries as per retrytemplate are exhausted do a seek
consumer.seek(new TopicPartition(topicName, partition), offset);
}
catch (Exception ex) {
// if some other exception just log and put in dlq based on enableDlq property
logger.warn("some other business exception hence putting in dlq ");
throw ex;
}
if (ack != null) {
ack.acknowledge();
}
}
Проблема с этим подходом - так как я выполняю customer.seek (), хотя может быть ожидающие записи из последнего опроса, которые могут быть обработаны и зафиксированы, если БД появится в течение этого периода (следовательно, не в порядке). Есть ли способ очистить эти записи во время выполнения поиска?
PS - мы в настоящее время находимся в версии 2.0.3.RELEASE для весенней загрузки и Finchley.RELEASE или для облачных зависимостей Spring (следовательно, не можем использовать такие функции, как отрицательные либо подтверждение, и обновление невозможно в данный момент).