Интеграция Spring - Scatter-Gather - PullRequest
1 голос
/ 10 июля 2020

Я использую обработчик Spring Integration и Scatter Gather (https://docs.spring.io/spring-integration/docs/5.3.0.M1/reference/html/scatter-gather.html) для отправки 3 параллельных запросов (с использованием ExecutorChannels) внешним API REST и объединения их ответа в одно сообщение.

Все работает нормально, пока в методе Aggregator aggregatePayloads (AggregatingMessageHandler) не возникнет исключение. В этом сценарии сообщение об ошибке успешно доставляется на шлюз обмена сообщениями, который инициировал поток (вызывающий). Однако поток ScatterGatherHandler остается в подвешенном состоянии, ожидая ответа сборщика (я полагаю), который никогда не приходит из-за исключения внутри него. Т.е. каждый последовательный вызов оставляет один дополнительный поток в "зависшем" состоянии, и в конечном итоге в пуле потоков заканчиваются доступные рабочие потоки.

Моя текущая конфигурация Scatter Gather:

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setChannels(Arrays.asList(Channel1(asyncExecutor()),Channel2(asyncExecutor()),Channel3(asyncExecutor())));
    return router;
}

@Bean
public MessageHandler gatherer() {
    AggregatingMessageHandler aggregatingMessageHandler = new AggregatingMessageHandler(
            new TransactionAggregator(),
            new SimpleMessageStore(),
            new HeaderAttributeCorrelationStrategy("correlationID"),
            new ExpressionEvaluatingReleaseStrategy("size() == 3"));
    aggregatingMessageHandler.setExpireGroupsUponCompletion( true );
    return aggregatingMessageHandler;
}

@Bean
@ServiceActivator(inputChannel = "validationOutputChannel")
public MessageHandler scatterGatherDistribution() {
    ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
    handler.setErrorChannelName("scatterGatherErrorChannel");

    return handler;
}


@Bean("taskExecutor")
@Primary
public TaskExecutor asyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("AsyncThread-");
    executor.initialize();
    return executor;
}

Пока единственное решение я обнаружил, что нужно добавить значения RequiresReply и GatherTimeout для ScatterGatherHandler, как показано ниже:

handler.setGatherTimeout(120000L);
handler.setRequiresReply(true);

Это вызовет исключение и освободит поток ScatterGatherHandler для вытягивания после указанного значения тайм-аута и после того, как исключение агрегатора будет доставлено на шлюз обмена сообщениями . Я вижу в журнале следующее сообщение:

[AsyncThread-1] [WARN] [o.s.m.c.GenericMessagingTemplate$TemporaryReplyChannel:] [{}] - Reply message received but the receiving thread has already received a reply: ErrorMessage

Есть ли другой способ добиться этого? Моя основная цель - убедиться, что я не блокирую какие-либо потоки в случае возникновения исключения в методе агрегатора aggregatePayloads.

Спасибо.

1 Ответ

1 голос
/ 10 июля 2020

Технически это действительно ожидаемое поведение. См. Документы: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#scatter -gather-error-processing

В этом случае для ScatterGatherHandler должен быть настроен разумный, конечный gatherTimeout. В противном случае по умолчанию он будет заблокирован, ожидая ответа от собирателя навсегда.

На самом деле нет никакого способа разрушить ожидания от BlockingQueue.take() из этого ScatterGatherHandler кода.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...