В настоящее время я работаю над повышением производительности в потоке интеграции, пытаясь распараллелить обработку сообщений. Я реализовал все с использованием Java DSL.
Текущий поток интеграции принимает сообщения из канала очереди с фиксированным поллером и последовательно обрабатывает сообщение через несколько обработчиков, пока не достигнет окончательного обработчика, который делает некоторые окончательные вычисления, учитывающие каждый вывод предыдущего обработчика. Все они подключены в одном потоке интеграции. И в основном эти обработчики переносят вызовы на внешние системы. Здесь важно сохранить то, что сообщение не должно быть взято из очереди, пока весь нисходящий поток не будет завершен для предыдущего. Для распараллеливания мне нужны обработчики.
Текущий поток интеграции: MessageQueue -> Poller -> Обработчик 1 -> Обработчик 2 -> Обработчик X -> Окончательный обработчик
Я попытался включить параллелизм делает следующее, и это работает довольно хорошо.
MessageQueue -> Poller -> Splitter -> Executor -> Маршрутизатор с сопоставлениями подпотока с различными обработчиками -> Aggregator -> Final Handler
Проблема При таком подходе я обнаружил, что новое сообщение берется из канала очереди, прежде чем предыдущее проходит через весь нисходящий поток. Совершенно ясно, почему, добавив Splitter и Executor, можно изменить способ посещения сообщений, но дело в том, что между результатами может быть зависимость между различными сообщениями.
Вопрос в том, как я могу получать сообщения из канал очереди по одному, как бы «приостанавливать» опросчик до тех пор, пока обрабатываемое сообщение не перейдет к последней конечной точке после агрегатора? Я не знаю, как переставить компоненты или что еще я могу сделать, чтобы добиться этого.
Извините, я пытался найти ответ, но не смог его найти ... пожалуйста, обратитесь за советом. Большое спасибо
@ Blink, это то, что сработало для меня, возможно, нужен какой-то рефакторинг, и я уверен, что это можно было бы написать более элегантно. Я не эксперт, извините.
Ну вот основные элементы c:
- Интерфейс для переноса системы сообщений
Сообщение Канал, по которому Сообщение будет маршрутизироваться при вызове метода шлюза
@Bean
public DirectChannel integrationChannel() {
return MessageChannels.direct().get();
}
@MessagingGateway
interface WrappingGateway {
@Gateway(requestChannel = "integrationChannel")
TrackingLog executeIntegration(TrackingLog trackingLog);
}
TrackingLog - это модель, которую я использую для регистрации результатов в нисходящем потоке.
И в основном Я называю шлюз обертывания в потоке интеграции, который извлекает сообщения из очереди сообщений.
@Autowired
WrappingGateway integrationGateway;
@Bean
public IntegrationFlow createCatalogueChannelFlow() {
return IntegrationFlows.from(cataloguePriorityChannel())
// Queue Poller
.bridge(s -> s.poller(Pollers.fixedRate(1, TimeUnit.SECONDS).maxMessagesPerPoll(1)).autoStartup(true)
.id("cataloguePriorityChannelBridge"))
// Call to Gateway
.handle(m -> {
this.integrationGateway
.executeIntegration(((TrackingLog) m.getPayload()));
})
.get();
}
@Bean
public IntegrationFlow startCatalogueIntegrationChannelFlow() {
return IntegrationFlows.from(integrationChannel())
// Log
.handle(trackerSupportClient, "logMessagePreExecution")
// Set TrackingLog in message Header
.enrichHeaders(e -> e.headerFunction("TRACKING_LOG", m -> {
return ((TrackingLog) m.getPayload());
}))
....
Вся интеграция немного сложнее, она начинается с Asyn c HTTP-шлюза, преобразователей и маршрутизаторов. , магазины в mongodb, et c. Дело в том, что, как подсказал мне @Artem Bilan, вызов шлюза блокирует поток и не позволяет получателю очередей получать больше сообщений, пока текущее не будет полностью обработано.
Надеюсь, это поможет вам.