Spring Cloud Dataflow errorChannel не работает - PullRequest
0 голосов
/ 27 ноября 2018

Я пытаюсь создать собственный обработчик исключений для своего потока данных Spring Cloud, чтобы маршрутизировать некоторые ошибки в очередь и другие в DLQ.

Для этого я использую глобальный SpringИнтеграция "errorChannel" и маршрутизация на основе типа исключения.

Это код для маршрутизатора ошибок Spring Integration:

package com.acme.error.router;

import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;


@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
   private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);

   public static final String ERROR_CHANNEL = "errorChannel";

   @Router(inputChannel = ERROR_CHANNEL)
    public String onError(Message<Object> message) {
      LOGGER.debug("ERROR ROUTER - onError");
      if(message.getPayload() instanceof MessageTransformationException) {
         MessageTransformationException exception = (MessageTransformationException) message.getPayload();
         Message<?> failedMessage = exception.getFailedMessage();
          if(exceptionChainContainsDlq(exception)) {
             return ErrorMessageChannels.DLQ_QUEUE_NAME;
          }
         return ErrorMessageChannels.REQUEUE_CHANNEL;
      }
      return ErrorMessageChannels.DLQ_QUEUE_NAME;
    }

    ...

}

Маршрутизатор ошибок выбирается каждым из потоковых приложений черезсканирование пакета в Spring Boot App для каждого:

@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}

Когда он развертывается и запускается на локальном сервере потоков данных Spring Cloud (версия 1.5.0-RELEASE) и выдается исключение DlqException, появляется сообщениеуспешно перенаправляется в метод onError в errorRouter, а затем помещается в раздел dlq.

Однако, когда он развертывается как док-контейнер с сервером SCDF Kubernetes (также версии 1.5.0-RELEASE), onErrorметод никогда не срабатывает.(Оператор log в начале маршрутизатора никогда не выводится)

В журналах запуска потоковых приложений похоже, что компонент правильно выбран и регистрируется как прослушиватель errorChannel, но для некоторыхпричина, когда исключения генерируются, они не обрабатываются методом onError в нашем маршрутизаторе.

Журналы запуска:

o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router

Мы используем все настройки по умолчанию для весеннего облачного потока и kafkaконфигурации связывателя:

spring.cloud:
  stream:
    binders:
      kafka:
        type: kafka
        environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
        environment.spring.cloud.stream.kafka.binder.zkNodes=zklist

Редактировать: добавлены аргументы pod из kubectl describe <pod>

Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher

Еще одна идея, которую мы пытались использовать, - попытка использовать Spring Cloud Stream - поддержку канала ошибок интеграции пружин дляотправить в тему брокера сообщения об ошибках, но поскольку сообщения, похоже, вообще не попадают в глобальный SpringChill Error Integration, это тоже не сработало.

Есть ли что-то особенное, что нам нужно сделать в SCDFKubernetes, чтобы включить глобальный канал Spring Integration errorChannel?

Что мне здесь не хватает?

Обновление с помощью solutiпо комментариям:

После просмотра вашей конфигурации я теперь почти уверен, что знаю, в чем проблема.У вас есть сценарий конфигурации с несколькими подшивками.Даже если вы имеете дело только с одним экземпляром связывателя, существование spring.cloud.stream.binders .... это то, что заставит фреймворк рассматривать его как мульти-связыватель.В основном это ошибка - github.com/spring-cloud/spring-cloud-stream/issues/1384.Как вы можете видеть, это было исправлено, но вам нужно обновить его до Elmhurst.SR2 или получить последний снимок (мы находимся в RC2 и 2.1.0. В любом случае, РЕЛИЗ уже через несколько недель) - Олег Жураковский

Это была действительно проблема с нашей настройкой.Вместо обновления мы на данный момент просто исключили использование мульти-связующего, и проблема была решена.

1 Ответ

0 голосов
/ 06 декабря 2018

Обновление с решением из комментариев:

После просмотра вашей конфигурации я теперь почти уверен, что знаю, в чем проблема.У вас есть сценарий конфигурации с несколькими подшивками.Даже если вы имеете дело только с одним экземпляром связывателя, существование spring.cloud.stream.binders .... это то, что заставит фреймворк рассматривать его как мульти-связыватель.В основном это ошибка - github.com/spring-cloud/spring-cloud-stream/issues/1384.Как вы можете видеть, это было исправлено, но вам нужно обновить его до Elmhurst.SR2 или получить последний снимок (мы находимся в RC2 и 2.1.0. В любом случае, РЕЛИЗ уже через несколько недель) - Олег Жураковский

Это была действительно проблема с нашей настройкой.Вместо того, чтобы обновлять, мы на данный момент просто прекратили использование мультибиндера, и проблема была решена.

...