Настройка
У меня есть приложение Spring Boot, которое называется Диспетчер . Он работает на 1 компьютере и имеет встроенный брокер ActiveMQ:
@Bean
public BrokerService broker(ActiveMQProperties properties) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector(properties.getBrokerUrl());
return broker;
}
, который записывает задачи в очередь JMS:
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(taskQueue())
.bridge(Bridges.blockingPoller(outboundTaskScheduler()))
.transform(outboundTransformer)
.handle(Jms.outboundAdapter(connectionFactory)
.extractPayload(false)
.destination(JmsQueueNames.STANDARD_TASKS))
.get();
}
@Bean
public QueueChannel standardTaskQueue() {
return MessageChannels.priority()
.comparator(TASK_PRIO_COMPARATOR)
.get();
}
// 2 more queues with different names but same config
Приложение Worker работает на 10 Машины с 20 ядрами в каждом и настроены так:
@Bean
public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
int maxWorkers = 20;
return IntegrationFlows
.from(Jms.channel(connectionFactory)
.sessionTransacted(true)
.concurrentConsumers(maxWorkers)
.taskExecutor(
Executors.newFixedThreadPool(maxWorkers, new CustomizableThreadFactory("standard-"))
)
.destination(JmsQueueNames.STANDARD_TASKS))
.channel(ChannelNames.TASKS_INBOUND)
.get();
}
// 2 more inbound queues with different names but same config
Это повторяется для 2-й очереди плюс 1 особый случай. Таким образом, имеется всего 401 потребителей .
Наблюдение
Используя JConsole, я вижу, что в очереди ActiveMQ есть задачи:
[ Скриншот вставки TODO]
Как и ожидалось, на любом компьютере Worker имеется 20 пользовательских потоков:
[Скриншот вставки TODO]
Но большинство, если не все, idle , хотя в очереди все еще есть сообщения. Используя наш инструмент мониторинга, я вижу, что в каждый конкретный момент времени обрабатывается от 50 до 400 задач, когда ожидание равно 400.
Я также заметил, что Spring создает AbstractPollingMessageListenerContainer
для каждого потребителя, что кажется в результате будет открыто 1 соединение JMS для каждого приложения в очереди в секунду (33 соединения в секунду).
Расследование
Итак, я обнаружил Я не получаю сообщения от моего второго потребителя , что намекает на то, что prefetch
является виновником. Это звучало правдоподобно, поэтому я настроил tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1
для каждого работника. Затем, однако, в любой момент обрабатывалось только около 25 задач, которые для меня не имели никакого смысла.
Вопрос
Кажется, я не понимаю, что происходит, и с тех пор, как я ' У меня не хватает времени для расследования, я надеялся, что кто-нибудь сможет указать мне правильное направление. Какие факторы могут быть причиной? Количество потребителей / подключений? Предварительная выборка? Что-нибудь еще?