Spring Integration JMS Потребители не используют все сообщения - PullRequest
0 голосов
/ 14 января 2020

Настройка

У меня есть приложение Spring Boot, которое называется Диспетчер . Он работает на 1 компьютере и имеет встроенный брокер ActiveMQ:

  @Bean
  public BrokerService broker(ActiveMQProperties properties) throws Exception {
    BrokerService broker = new BrokerService();
    broker.setPersistent(false);
    broker.addConnector(properties.getBrokerUrl());
    return broker;
  }

, который записывает задачи в очередь JMS:

  @Bean
  public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
      .from(taskQueue())
      .bridge(Bridges.blockingPoller(outboundTaskScheduler()))
      .transform(outboundTransformer)
      .handle(Jms.outboundAdapter(connectionFactory)
        .extractPayload(false)
        .destination(JmsQueueNames.STANDARD_TASKS))
      .get();
  }

  @Bean
  public QueueChannel standardTaskQueue() {
    return MessageChannels.priority()
      .comparator(TASK_PRIO_COMPARATOR)
      .get();
  }

  // 2 more queues with different names but same config

Приложение Worker работает на 10 Машины с 20 ядрами в каждом и настроены так:

  @Bean
  public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
    int maxWorkers = 20;
    return IntegrationFlows
      .from(Jms.channel(connectionFactory)
        .sessionTransacted(true)
        .concurrentConsumers(maxWorkers)
        .taskExecutor(
          Executors.newFixedThreadPool(maxWorkers, new CustomizableThreadFactory("standard-"))
        )
        .destination(JmsQueueNames.STANDARD_TASKS))
      .channel(ChannelNames.TASKS_INBOUND)
      .get();
  }

  // 2 more inbound queues with different names but same config

Это повторяется для 2-й очереди плюс 1 особый случай. Таким образом, имеется всего 401 потребителей .

Наблюдение

Используя JConsole, я вижу, что в очереди ActiveMQ есть задачи:

[ Скриншот вставки TODO]

Как и ожидалось, на любом компьютере Worker имеется 20 пользовательских потоков:

[Скриншот вставки TODO]

Но большинство, если не все, idle , хотя в очереди все еще есть сообщения. Используя наш инструмент мониторинга, я вижу, что в каждый конкретный момент времени обрабатывается от 50 до 400 задач, когда ожидание равно 400.

Я также заметил, что Spring создает AbstractPollingMessageListenerContainer для каждого потребителя, что кажется в результате будет открыто 1 соединение JMS для каждого приложения в очереди в секунду (33 соединения в секунду).

Расследование

Итак, я обнаружил Я не получаю сообщения от моего второго потребителя , что намекает на то, что prefetch является виновником. Это звучало правдоподобно, поэтому я настроил tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1 для каждого работника. Затем, однако, в любой момент обрабатывалось только около 25 задач, которые для меня не имели никакого смысла.

Вопрос

Кажется, я не понимаю, что происходит, и с тех пор, как я ' У меня не хватает времени для расследования, я надеялся, что кто-нибудь сможет указать мне правильное направление. Какие факторы могут быть причиной? Количество потребителей / подключений? Предварительная выборка? Что-нибудь еще?

1 Ответ

0 голосов
/ 14 января 2020

Оказалось, что это вызвано политикой предварительной выборки. В моем случае правильной конфигурацией было использование tcp://dispatcher:61616?jms.prefetchPolicy.all=0

В моем предыдущем (неудачном) тесте я использовал jms.prefetchPolicy.queuePrefetch=1, но в ретроспективе я не уверен, настроил ли я его в правильном месте.

...