Я использую обработчик Spring Integration и Scatter Gather (https://docs.spring.io/spring-integration/docs/5.3.0.M1/reference/html/scatter-gather.html) для отправки 3 параллельных запросов (с использованием ExecutorChannels) внешним API REST и объединения их ответа в одно сообщение.
Все работает нормально, пока в методе Aggregator aggregatePayloads (AggregatingMessageHandler) не возникнет исключение. В этом сценарии сообщение об ошибке успешно доставляется на шлюз обмена сообщениями, который инициировал поток (вызывающий). Однако поток ScatterGatherHandler остается в подвешенном состоянии, ожидая ответа сборщика (я полагаю), который никогда не приходит из-за исключения внутри него. Т.е. каждый последовательный вызов оставляет один дополнительный поток в "зависшем" состоянии, и в конечном итоге в пуле потоков заканчиваются доступные рабочие потоки.
Моя текущая конфигурация Scatter Gather:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setChannels(Arrays.asList(Channel1(asyncExecutor()),Channel2(asyncExecutor()),Channel3(asyncExecutor())));
return router;
}
@Bean
public MessageHandler gatherer() {
AggregatingMessageHandler aggregatingMessageHandler = new AggregatingMessageHandler(
new TransactionAggregator(),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy("correlationID"),
new ExpressionEvaluatingReleaseStrategy("size() == 3"));
aggregatingMessageHandler.setExpireGroupsUponCompletion( true );
return aggregatingMessageHandler;
}
@Bean
@ServiceActivator(inputChannel = "validationOutputChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setErrorChannelName("scatterGatherErrorChannel");
return handler;
}
@Bean("taskExecutor")
@Primary
public TaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
Пока единственное решение я обнаружил, что нужно добавить значения RequiresReply и GatherTimeout для ScatterGatherHandler, как показано ниже:
handler.setGatherTimeout(120000L);
handler.setRequiresReply(true);
Это вызовет исключение и освободит поток ScatterGatherHandler для вытягивания после указанного значения тайм-аута и после того, как исключение агрегатора будет доставлено на шлюз обмена сообщениями . Я вижу в журнале следующее сообщение:
[AsyncThread-1] [WARN] [o.s.m.c.GenericMessagingTemplate$TemporaryReplyChannel:] [{}] - Reply message received but the receiving thread has already received a reply: ErrorMessage
Есть ли другой способ добиться этого? Моя основная цель - убедиться, что я не блокирую какие-либо потоки в случае возникновения исключения в методе агрегатора aggregatePayloads.
Спасибо.