Пожалуйста, обратитесь к прилагаемой схеме для настройки.Объяснение ниже.
Существует общий входной канал, который принимает запросы.Из этого входного канала есть два потока:
Поток 1 - сохраняет запрос в БД
Поток 2 - отправляет запрос на бизнесОбработка / пересылка в другие внешние системы
Я хотел, чтобы поток 1 и поток 2 были независимы друг от друга.Поэтому я поставил поток 1 на канал исполнителя.Таким образом, ошибка в потоке 1 не нарушит поток 2.
Объяснение потока 1:
- Из общего входного канала код считываетзапросить и поместить его в канал исполнителя.
- Из канала исполнителя класс DBStore считывает запрос и сохраняет его в БД.
- У меня также есть канал ошибок (общий для всех классовв проекте), которая будет спокойно регистрировать ошибку
Что у меня есть:
Внутри кода в поле зеленого цвета я определил ExpressionEvaluatingRequestHandlerAdvice такчто любая ошибка на канале исполнителя отправляется на канал ошибок.Я предполагал, что ExpressionEvaluatingRequestHandlerAdvice будет автоматически применен к каналу исполнителя.
Вместо этого, если есть ошибка, он повторно помещается в «Общий входной канал» и обрабатывается многократно доочередь заполняется.
Что мне нужно:
Я хочу, чтобы любая ошибка на канале исполнителя отправлялась в канал ошибок, где она будетбудьте тихо зарегистрированы, и сообщение будет удалено.
КОД, КОТОРЫЙ ЧИТАЕТ С ОБЩЕГО ВХОДНОГО КАНАЛА И ПОСТАВЛЯЕТ НА КАНАЛ ИСПОЛНИТЕЛЯ:
@Configuration
@EnableIntegration
public class InputChanneltoExecutorChannelConfig {
//DEFINING THE EXECUTOR CHANNEL
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
@Bean(name="executorChannelToDB")
public ExecutorChannel outboundRequests() {
return new ExecutorChannel(taskExecutor());
}
//DEFINE FAILURE CHANNEL FOR USE IN ExpressionEvaluatingRequestHandlerAdvice
@Bean(name = "DBFailureChannel")
public static MessageChannel getFailureChannel() {
return new DirectChannel();
}
//MAIN METHOD THAT READS FROM INPUT CHANNEL AND SENDS TO EXECUTOR CHANNEL
@Bean
public IntegrationFlow outboundtoDB() {
return IntegrationFlows
.from("commonInputChannel")
/*
* We publish the msg to be stored into the DB onto a executor
* channel (so that the DB operation is processed on a separate
* thread).
*/
.channel("executorChannelToDB").get();
/****************************************************************************
*********************************************************
* How do I route the error from executor channel to error channel over here?
**********************************************************
****************************************************************************/
}
/*
* Create an advice bean to handle DB errors. In case of failure, send
* response to a separate channel.
*/
@Bean
public ExpressionEvaluatingRequestHandlerAdvice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setFailureChannelName("DBFailureChannel");
advice.setOnFailureExpressionString("'##Error while storing request into DB'");
advice.setTrapException(true);
return advice;
}
/*
* We create a separate flow for DB failure because in future we may need
* other actions such as retries/notify support in addition to logging.
*/
@Bean
public IntegrationFlow failure() {
return IntegrationFlows.from("DBFailureChannel")
.channel("errorChannel").get();
}
}
ОБНОВЛЕНИЕ: По предложению Гэри обновлены ERROR_CHANNEL и REPLY_CHANNEL.
@Bean
public IntegrationFlow outboundtoDB() {
return IntegrationFlows
.from("commonInputChannel")
//Setting Headers
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "errorChannel", true))
.enrichHeaders(h -> h.header(MessageHeaders.REPLY_CHANNEL, "DBSuccessChannel", true))
.channel("executorChannelToDB").get();
Канал DBSuccess настроен на обработку ответа, подобного этому:
@Bean
public IntegrationFlow success() {
return IntegrationFlows
.from("DBSuccessChannel")
.wireTap(
flow -> flow.handle(msg -> logger
.info("Response from storing in DB : "
+ msg.getPayload()))).get();
}
Но я все еще получаю ошибку,
2018-09-26 23: 34: 47.398 ОШИБКА 17186 --- [SimpleAsyncTaskExecutor-465] osintegration.handler.LoggingHandler: org.springframework.messaging.MessageHandlingException: вложенное исключение составляетjava.time.format.DateTimeParseException: не удалось проанализировать текст «отметка времени создания образца» с индексом 0, failedMessage = GenericMessage [payload=com.td.sba.iep.schema.InstructionRs@37919153, headers = {errorChannel = errorChannel, jms_destination = commonInputChannel , Solace_JMS_Prop_IS_Reply_Message = ложь, приоритет = 0, jms_timestamp = 1538018141672, JMS_Solace_isXML = верно, replyChannel = DBSuccessChannel, jms_redelivered = верно, JMS_Solace_DeliverToOne = ложь, JMS_Solace_ElidingEligible = ложь, JMS_Solace_DeadMsgQueueEligible = ложь, ID = ff6c2ea6-b6d6-c67a-7943-6b7db33bb977, jms_messageId = ID: 49.37.4.163d608166190664e70: 0, отметка времени = 1538019287394}] *
Здесь jms_destination по-прежнему устанавливается в качестве входного канала, и ошибки сохраняются в качестве входного канала, и ошибки сохраняютсявозвращаться в commonInputChannel.Можете ли вы помочь?