Как маршрутизировать ошибки из канала исполнителя в канал ошибок? - PullRequest
0 голосов
/ 27 сентября 2018

Пожалуйста, обратитесь к прилагаемой схеме для настройки.Объяснение ниже.

enter image description here

Существует общий входной канал, который принимает запросы.Из этого входного канала есть два потока:

  1. Поток 1 - сохраняет запрос в БД

  2. Поток 2 - отправляет запрос на бизнесОбработка / пересылка в другие внешние системы

Я хотел, чтобы поток 1 и поток 2 были независимы друг от друга.Поэтому я поставил поток 1 на канал исполнителя.Таким образом, ошибка в потоке 1 не нарушит поток 2.

Объяснение потока 1:

  1. Из общего входного канала код считываетзапросить и поместить его в канал исполнителя.
  2. Из канала исполнителя класс DBStore считывает запрос и сохраняет его в БД.
  3. У меня также есть канал ошибок (общий для всех классовв проекте), которая будет спокойно регистрировать ошибку

Что у меня есть:

Внутри кода в поле зеленого цвета я определил ExpressionEvaluatingRequestHandlerAdvice такчто любая ошибка на канале исполнителя отправляется на канал ошибок.Я предполагал, что ExpressionEvaluatingRequestHandlerAdvice будет автоматически применен к каналу исполнителя.

Вместо этого, если есть ошибка, он повторно помещается в «Общий входной канал» и обрабатывается многократно доочередь заполняется.

Что мне нужно:

Я хочу, чтобы любая ошибка на канале исполнителя отправлялась в канал ошибок, где она будетбудьте тихо зарегистрированы, и сообщение будет удалено.

КОД, КОТОРЫЙ ЧИТАЕТ С ОБЩЕГО ВХОДНОГО КАНАЛА И ПОСТАВЛЯЕТ НА КАНАЛ ИСПОЛНИТЕЛЯ:

    @Configuration
@EnableIntegration
public class InputChanneltoExecutorChannelConfig {

//DEFINING THE EXECUTOR CHANNEL
    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }

    @Bean(name="executorChannelToDB")
    public ExecutorChannel outboundRequests() {
        return new ExecutorChannel(taskExecutor());
    }
//DEFINE FAILURE CHANNEL FOR USE IN ExpressionEvaluatingRequestHandlerAdvice
    @Bean(name = "DBFailureChannel")
    public static MessageChannel getFailureChannel() {
        return new DirectChannel();
    }   

//MAIN METHOD THAT READS FROM INPUT CHANNEL AND SENDS TO EXECUTOR CHANNEL
    @Bean
    public IntegrationFlow outboundtoDB() {
        return IntegrationFlows
                .from("commonInputChannel")
                /*
                 * We publish the msg to be stored into the DB onto a executor
                 * channel (so that the DB operation is processed on a separate
                 * thread).
                 */
                .channel("executorChannelToDB").get();
                /****************************************************************************
                        *********************************************************
                 * How do I route the error from executor channel to error channel over here?
                        **********************************************************
                 ****************************************************************************/
    }

    /*
     * Create an advice bean to handle DB errors. In case of failure, send
     * response to a separate channel.
     */
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setFailureChannelName("DBFailureChannel");
        advice.setOnFailureExpressionString("'##Error while storing request into DB'");
        advice.setTrapException(true);
        return advice;
    }

    /*
     * We create a separate flow for DB failure because in future we may need
     * other actions such as retries/notify support in addition to logging.
     */
    @Bean
    public IntegrationFlow failure() {
        return IntegrationFlows.from("DBFailureChannel")
                .channel("errorChannel").get();

    }   
}

ОБНОВЛЕНИЕ: По предложению Гэри обновлены ERROR_CHANNEL и REPLY_CHANNEL.

   @Bean
    public IntegrationFlow outboundtoDB() {
        return IntegrationFlows
                .from("commonInputChannel")
                //Setting Headers
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
                .enrichHeaders(h -> h.header(MessageHeaders.REPLY_CHANNEL, "DBSuccessChannel", true))
                .channel("executorChannelToDB").get();

Канал DBSuccess настроен на обработку ответа, подобного этому:

@Bean
public IntegrationFlow success() {
    return IntegrationFlows
            .from("DBSuccessChannel")
            .wireTap(
                    flow -> flow.handle(msg -> logger
                            .info("Response from storing in DB : "
                                    + msg.getPayload()))).get();
}

Но я все еще получаю ошибку,

2018-09-26 23: 34: 47.398 ОШИБКА 17186 --- [SimpleAsyncTaskExecutor-465] osintegration.handler.LoggingHandler: org.springframework.messaging.MessageHandlingException: вложенное исключение составляетjava.time.format.DateTimeParseException: не удалось проанализировать текст «отметка времени создания образца» с индексом 0, failedMessage = GenericMessage [payload=com.td.sba.iep.schema.InstructionRs@37919153, headers = {errorChannel = errorChannel, jms_destination = commonInputChannel , Solace_JMS_Prop_IS_Reply_Message = ложь, приоритет = 0, jms_timestamp = 1538018141672, JMS_Solace_isXML = верно, replyChannel = DBSuccessChannel, jms_redelivered = верно, JMS_Solace_DeliverToOne = ложь, JMS_Solace_ElidingEligible = ложь, JMS_Solace_DeadMsgQueueEligible = ложь, ID = ff6c2ea6-b6d6-c67a-7943-6b7db33bb977, jms_messageId = ID: 49.37.4.163d608166190664e70: 0, отметка времени = 1538019287394}] *

Здесь jms_destination по-прежнему устанавливается в качестве входного канала, и ошибки сохраняются в качестве входного канала, и ошибки сохраняютсявозвращаться в commonInputChannel.Можете ли вы помочь?

1 Ответ

0 голосов
/ 27 сентября 2018

Этот совет не поможет, поскольку он применяется только к этой конечной точке, а не к нисходящему потоку, и, в любом случае, даже если он это сделал, передача обслуживания исполнителю будет успешной, а любые последующие исключения обработаны исполнителем (который обернут в ErrorHandlingTaskExecutor с MessagePublishingErrorHandler).

Попробуйте заменить этот компонент на обогащающий заголовок и установите заголовок errorChannel.Или вы можете обернуть TE самостоятельно с помощью MPEH, настроенного с вашим каналом ошибок (канал исполнителя обнаружит, что TE уже является EHTE).

EDIT

Thisу меня отлично работает ...

@SpringBootApplication
public class So52526134Application {

    public static void main(String[] args) {
        SpringApplication.run(So52526134Application.class, args);
    }

    @Bean
    public IntegrationFlow mainFlow() {
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5000)))
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "myErrors.input"))
                .channel(MessageChannels.executor(executor()))
                .handle((p, h) -> {
                    throw new RuntimeException("foo");
                })
                .get();
    }

    @Bean
    public IntegrationFlow myErrors() {
        return f -> f.handle((p, h) -> {
            System.out.println("in my error flow");
            return p;
        })
        .handle(System.out::println);
    }

    @Bean
    public TaskExecutor executor() {
        return new ThreadPoolTaskExecutor();
    }

}

и

in my error flow
ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: ...
...