Я пытаюсь создать собственный обработчик исключений для своего потока данных Spring Cloud, чтобы маршрутизировать некоторые ошибки в очередь и другие в DLQ.
Для этого я использую глобальный SpringИнтеграция "errorChannel" и маршрутизация на основе типа исключения.
Это код для маршрутизатора ошибок Spring Integration:
package com.acme.error.router;
import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);
public static final String ERROR_CHANNEL = "errorChannel";
@Router(inputChannel = ERROR_CHANNEL)
public String onError(Message<Object> message) {
LOGGER.debug("ERROR ROUTER - onError");
if(message.getPayload() instanceof MessageTransformationException) {
MessageTransformationException exception = (MessageTransformationException) message.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
if(exceptionChainContainsDlq(exception)) {
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
return ErrorMessageChannels.REQUEUE_CHANNEL;
}
return ErrorMessageChannels.DLQ_QUEUE_NAME;
}
...
}
Маршрутизатор ошибок выбирается каждым из потоковых приложений черезсканирование пакета в Spring Boot App для каждого:
@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}
Когда он развертывается и запускается на локальном сервере потоков данных Spring Cloud (версия 1.5.0-RELEASE) и выдается исключение DlqException, появляется сообщениеуспешно перенаправляется в метод onError в errorRouter, а затем помещается в раздел dlq.
Однако, когда он развертывается как док-контейнер с сервером SCDF Kubernetes (также версии 1.5.0-RELEASE), onErrorметод никогда не срабатывает.(Оператор log в начале маршрутизатора никогда не выводится)
В журналах запуска потоковых приложений похоже, что компонент правильно выбран и регистрируется как прослушиватель errorChannel, но для некоторыхпричина, когда исключения генерируются, они не обрабатываются методом onError в нашем маршрутизаторе.
Журналы запуска:
o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router
Мы используем все настройки по умолчанию для весеннего облачного потока и kafkaконфигурации связывателя:
spring.cloud:
stream:
binders:
kafka:
type: kafka
environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
environment.spring.cloud.stream.kafka.binder.zkNodes=zklist
Редактировать: добавлены аргументы pod из kubectl describe <pod>
Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher
Еще одна идея, которую мы пытались использовать, - попытка использовать Spring Cloud Stream - поддержку канала ошибок интеграции пружин дляотправить в тему брокера сообщения об ошибках, но поскольку сообщения, похоже, вообще не попадают в глобальный SpringChill Error Integration, это тоже не сработало.
Есть ли что-то особенное, что нам нужно сделать в SCDFKubernetes, чтобы включить глобальный канал Spring Integration errorChannel?
Что мне здесь не хватает?
Обновление с помощью solutiпо комментариям:
После просмотра вашей конфигурации я теперь почти уверен, что знаю, в чем проблема.У вас есть сценарий конфигурации с несколькими подшивками.Даже если вы имеете дело только с одним экземпляром связывателя, существование spring.cloud.stream.binders .... это то, что заставит фреймворк рассматривать его как мульти-связыватель.В основном это ошибка - github.com/spring-cloud/spring-cloud-stream/issues/1384.Как вы можете видеть, это было исправлено, но вам нужно обновить его до Elmhurst.SR2 или получить последний снимок (мы находимся в RC2 и 2.1.0. В любом случае, РЕЛИЗ уже через несколько недель) - Олег Жураковский
Это была действительно проблема с нашей настройкой.Вместо обновления мы на данный момент просто исключили использование мульти-связующего, и проблема была решена.