Увеличьте скорость потребления сообщений - PullRequest
0 голосов
/ 08 мая 2020

Мы используем ActiveMQ в качестве брокера сообщений.

Для одной из очередей мы обнаружили, что скорость, с которой создается сообщение, намного выше, чем скорость, с которой сообщения потребляются. Иногда это приводит к сбою ActiveMQ.

Итак, мы изучаем варианты увеличения скорости потребления. (Первым приоритетом является увеличение скорости существующего потребителя, а затем увеличение количества подов / экземпляров потребителей.) база данных несколько раз и может быть квалифицирована как задача, связанная с вводом-выводом. Итак, мы думаем об обработке нескольких сообщений параллельно.

Мы нашли следующие варианты.

  1. Мы установили concurreny на 1-1, подразумевая, что concurrentConsumers и maxConcurrentConsumers равно 1, и одновременно обрабатывается только 1 сообщение. Это конфигурация, которую мы можем увеличить, чтобы сказать 5-10, чтобы минимум 5 и максимум 10 потребителей могли работать одновременно.

  2. Мы также обнаружили, что мы также можем установить TaskExecutor на фабрике слушателей. Как и установка threadPoolExecutor (corePoolSize = 5, maxPoolSize = 10, queueCapacity=50), также поможет нам одновременно обрабатывать сообщения.

  3. Я не уверен, какой вариант использовать (1 или 2)

  4. Если мы увеличим параллелизм до 5-10, а также установим ThreadPoolExecutor, каковы будут его поведение и влияние?
  5. Есть ли указатели на выбор оптимальных значений параметров?

Ответы [ 3 ]

2 голосов
/ 08 мая 2020

JavaDo c для DefaultMessageListenerContainer.setTaskExecutor() говорит следующее:

Установите Spring TaskExecutor на использование для запуска потоков слушателя.

По умолчанию это SimpleAsyncTaskExecutor, запускающее количество новых потоков в соответствии с указанным количеством одновременных потребителей.

Укажите альтернативу TaskExecutor для интеграции с существующим пулом потоков. Обратите внимание, что это действительно добавляет ценность только в том случае, если потоки управляются определенным образом c, например, в среде Java EE. Обычный пул потоков не добавляет особой ценности, поскольку этот контейнер слушателя будет занимать несколько потоков на протяжении всего своего жизненного цикла.

Итак, основной вариант использования для настройки исполнителя задачи - интеграция с существующий пул потоков. Исполнитель по умолчанию уже будет масштабироваться в соответствии с количеством одновременно настроенных потребителей . Поэтому я бы не рекомендовал вам устанавливать исполнителя задач.

Я бы рекомендовал просто установить параллелизм.

Вам нужно будет определить оптимальные значения с помощью тщательного тестирования и анализа. Перед тем, как вы начнете этот процесс, я настоятельно рекомендую вам установить конкретную цель производительности c, потому что без этой цели тестирование и оптимизация могут превратиться в бесконечные задачи.

Вы также можете рассмотреть возможность использования управления потоком для производителей , чтобы очереди не были полностью заполнены.

1 голос
/ 10 мая 2020

Я согласен с предыдущими ответами.

Когда у вас низкая производительность потребления сообщений с Spring DefaultMessageListenerContainer (DML C), это почти всегда означает, что потребитель / сеанс / соединение JMS воссоздаются для каждого прочитанного сообщения.

В случае отсутствия транзакции вы можете кэшировать DML C потребителя. DML C будет использовать одно соединение JMS и запускать несколько сеансов JMS для соединения. У каждого сеанса будет получатель сообщений JMS.

В случае транзакции у вас не будет кеширования в DML C, и вам придется использовать фабрику кэширующих соединений, чтобы избежать проблем с производительностью. Посмотрите на org.apache.activemq.pool.PooledConnectionFactory или org.messaginghub.pooled.jms.JmsPoolConnectionFactory для этой функции.

Что касается ваших вопросов, я бы также не стал беспокоиться о TaskExecutor. Я бы порекомендовал для начала параллелизм 5-5. Я обнаружил, что одинаковое минимальное и максимальное количество потребителей (просто наличие прибитого пула) дает преимущества в стабильности и производительности.

Что касается сбоя ActiveMQ, это связано с предварительной выборкой ActiveMQ. По умолчанию ActiveMQ имеет значение предварительной выборки 1000. Потребитель сообщения JMS запрашивает сообщение, а брокер доставляет это сообщение и 999 других сообщений в предварительной выборке. Если потребитель / сеанс затем закрывается, 999 сообщений отбрасываются на стороне клиента, а затем повторно ставятся в очередь на брокере. Это очень оскорбительно для брокера и не очень хорошо обрабатывается.

Также имейте в виду, что если у вас есть 5 одновременных потребителей, например, тогда первый потребитель получит 1000 сообщений, а затем следующий получит 1000 сообщений и так далее. Таким образом, если у вас в очереди всего 500 сообщений, тогда будет активен только один потребитель. Вам потребуется 5000 сообщений в очереди для активации всех 5 потребителей.

В случае сомнений отключите предварительную выборку сообщений в конфигурации клиента:

tcp://broker_uri:61616?jms.prefetchPolicy.all=0

1 голос
/ 08 мая 2020

Какова ваша наблюдаемая пропускная способность в msg / s и MB / s?

По моему опыту, вы заметите, что самая быстрая пропускная способность будет наблюдаться при пакетных транзакциях через PooledConnectionFactory с использованием прямого JMS API. Шаблон Spring JMS может быть сложным в настройке и настройке, особенно с многопоточностью и транзакциями. Он также может закрывать такие объекты, как Consumer и Session для каждого сообщения, что устраняет преимущества кеширования на стороне брокера и предварительной выборки.

Когда вы go используете прямой JMS-API и пакетные транзакции (JMS, не обязательно XA) вы получите преимущество кеширования на стороне сервера брокера ActiveMQ и упреждающей выборки, что имеет огромное значение.

...