Я создал приложение, которое вызывает шлюз асинхронно, используя Splitter / Aggregator. В моем файле конфигурации я вызываю процесс через InvestmentMessagingGateway, который продолжает вызывать сплиттер. Каждое разделенное сообщение вызывает параллельный активатор службы и передает его в агрегатор.
Я поместил канал ошибок в InvestmentMessagingGateway и преобразовал каждое сообщение с ошибкой для передачи в агрегатор.
Я собираю каждое успешное и неудачное сообщение в агрегаторе как компиляцию для ответа.
Но когда я пытался поместить исключение в одно или несколько сообщений, я получаю сообщение об ошибке в своем агрегаторе
Ответное сообщение получено, но получающая нить уже получила ответ.
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="......."">
<context:component-scan base-package="com.api.investments"/>
<!--The gateway to be called in parallel-->
<gateway id="InvestmentGateway" service-interface="com.api.investments.gateways.InvestmentGateway"/>
<channel id="investmentDetailChannel"/>
<service-activator input-channel="investmentDetailChannel" ref="investmentService" method="getAccountPortfolio"/>
<!--Inbound gateway to invoke Splitter / Aggregator-->
<gateway id="InvestmentMessageGateway" service-interface="com.api.investments.gateways.InvestmentMessageGateway"
default-reply-channel="investmentAsyncReceiver" error-channel="investmentAsyncException"/>
<channel id="investmentAsyncSender"/>
<channel id="investmentAsyncReceiver"/>
<!-- Splitter for Invesment Details-->
<splitter input-channel="investmentAsyncSender" output-channel="investmentSplitChannel" id="investmentDetailsSplitter" ref="investmentComponentsSplitter" />
<channel id="investmentSplitChannel">
<queue />
</channel>
<!--Calls the Investment Gateway asynchronously using split messages ad send the response in aggregator-->
<service-activator input-channel="investmentSplitChannel" output-channel="investmentAggregateChannel" ref="investmentAsyncActivator" method="retrieveInvestmentDetailsAsync" requires-reply="true">
<poller receive-timeout="5000" task-executor="investmentExecutor" fixed-rate="50"/>
</service-activator>
<channel id="investmentAsyncException"/>
<!--Handles failed messages and pass it in aggregator-->
<transformer input-channel="investmentAsyncException" output-channel="investmentAggregateChannel" ref="invesmentErrorLogger" method="logError"/>
<!--Aggreggates successfull and failed messaged-->
<publish-subscribe-channel id="investmentAggregateChannel"/>
<aggregator input-channel="investmentAggregateChannel" output-channel="investmentAsyncReceiver" id="investmentAggregator"
ref="investmentComponentsAggregator" correlation-strategy="investmentComponentsCorrelationStrategy"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true" />
<task:executor id="investmentExecutor" pool-size="10-1000"
queue-capacity="5000"/>
</beans:beans>
Я попытался поместить свой канал ошибок в опросник активатора службы, но ошибка все та же, но на этот раз она не пошла в агрегатор.
Я также попытался поставить середину шлюза для активатора службы, как это
но ошибка стала нулевой.
<gateway id="InvestmentAsyncActivatorGateway" service-interface="com.api.investments.gateways.InvestmentAsyncActivatorGateway"
default-reply-channel="investmentAggregateChannel" error-channel="investmentAsyncException"/>
---- UPDATE ------
Это преобразователь, который обрабатывает каждое сообщение об ошибке
@Component("invesmentErrorLogger")
public class InvesmentErrorLoggerImpl implements InvestmentErrorLogger {
private final Logger logger = LoggerFactory.getLogger(Application.class.getName());
/**
* handles all error messages in InvestmentMessageGateway
* Creates an error message and pass it in the aggregator channel
* @param invesmentMessageError
* @return errorMessage
*/
@Override
public Message<ErrorDetails> logError(Message<?> invesmentMessageError) {
if(invesmentMessageError.getPayload().getClass().equals(MessagingException.class)) {
MessagingException messageException = (MessagingException) invesmentMessageError.getPayload();
AccountPortfolioRequest failedMsgPayload = (AccountPortfolioRequest) messageException.getFailedMessage().getPayload();
String logError = "Exception occured in Account Number: " + failedMsgPayload.getiAccNo();
logger.error(logError);
ErrorDetails productErrorDetail = new ErrorDetails();
productErrorDetail.setCode(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO);
productErrorDetail.setMessage(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO_DESC + ". Problem occured in Account Number: " + failedMsgPayload.getiAccNo());
Message<ErrorDetails> errorMessage = MessageBuilder.withPayload(productErrorDetail)
.setHeaderIfAbsent(InvestmentInquiryConstants.INV_CORRELATION_STRATEGY, InvestmentInquiryConstants.INV_CORRELATION_STRATEGY_VALUE)
.build();
return errorMessage;
}
else if(invesmentMessageError.getPayload().getClass().equals(MessageDeliveryException.class)) {
MessageDeliveryException messageException = (MessageDeliveryException) invesmentMessageError.getPayload();
AccountPortfolioRequest failedMsgPayload = (AccountPortfolioRequest) messageException.getFailedMessage().getPayload();
String logError = "Exception occured in Account Number: " + failedMsgPayload.getiAccNo();
logger.error(logError);
ErrorDetails productErrorDetail = new ErrorDetails();
productErrorDetail.setCode(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO);
productErrorDetail.setMessage(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO_DESC + ". Problem occured in Account Number: " + failedMsgPayload.getiAccNo());
Message<ErrorDetails> errorMessage = MessageBuilder.withPayload(productErrorDetail)
.setHeaderIfAbsent(InvestmentInquiryConstants.INV_CORRELATION_STRATEGY, InvestmentInquiryConstants.INV_CORRELATION_STRATEGY_VALUE)
.build();
return errorMessage;
}
else {
Exception messageException = (Exception) invesmentMessageError.getPayload();
String logError = "Exception occured in Investment Gateway ";
logger.error(logError);
logger.equals(messageException.getMessage());
ErrorDetails productErrorDetail = new ErrorDetails();
productErrorDetail.setCode(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO);
productErrorDetail.setMessage(InvestmentAPIErrorMessages.SVC_ERR_INQACCNTPORTFOLIO_DESC + " " + messageException.getMessage());
Message<ErrorDetails> errorMessage = MessageBuilder.withPayload(productErrorDetail)
.setHeaderIfAbsent(InvestmentInquiryConstants.INV_CORRELATION_STRATEGY, InvestmentInquiryConstants.INV_CORRELATION_STRATEGY_VALUE)
.build();
return errorMessage;
}
}
}