Правильная настройка ActiveMQ во избежание утечек памяти у производителя - PullRequest
0 голосов
/ 11 января 2019

Я не эксперт ActiveMQ, но я много раз пытался найти в Интернете похожие проблемы, и я все еще в замешательстве. У меня следующая проблема.

Запуск веб-приложения в Tomcat 8.x, Java 8, Spring Framework 4.3.18. Мое веб-приложение отправляет и получает сообщения с ActiveMQ, используя org.apache.activemq:activemq-spring:5.11.0 зависимость.

Я настраиваю фабрику соединений ActiveMQ следующим образом:

<amq:connectionFactory id="amqJmsFactory" brokerURL="${jms.broker.url}" />
<bean id="jmsConnectionFactory"
    class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory" ref="amqJmsFactory" />
    <property name="maxConnections" value="2" />
    <property name="idleTimeout" value="60000" />
    <property name="timeBetweenExpirationCheckMillis" value="600000" />
    <property name="maximumActiveSessionPerConnection" value="10" />
</bean>

Последнее свойство (maximumActiveSessionPerConnection) было установлено, чтобы попытаться решить следующую проблему (по умолчанию, кажется, 500, что довольно много, ИМХО), но я не уверен, что это действительно помогло, потому что я все еще получаю ошибки OutOfMemory.

На эту фабрику соединений ссылается фабрика слушателей:

<jms:listener-container factory-id="activationJmsListenerContainerFactory"
    container-type="default" connection-factory="jmsConnectionFactory"
    concurrency="1" transaction-manager="centralTransactionManager">
</jms:listener-container>

одним входящим адаптером Spring Integration 4.3.17:

<int-jms:message-driven-channel-adapter id="invoiceEventJmsInboundChannelAdapter" 
    channel="incomingInvoiceEventJmsChannel"
    connection-factory="jmsConnectionFactory"
    destination-name="incomingEvent"
    max-concurrent-consumers="2"
    transaction-manager="customerTransactionManager"
    error-channel="unexpectedErrorChannel" />

и двумя исходящими адаптерами:

<int-jms:outbound-channel-adapter id="invoiceEventJmsOutboundChannelAdapter"
    channel="outgoingInvoiceEventJmsChannel" destination-name="outgoingEvent"
    connection-factory="jmsConnectionFactory" explicit-qos-enabled="true" delivery-persistent="true" 
    session-transacted="true" />

<int-jms:outbound-channel-adapter
    id="passwordResetTokenSubmitterJmsOutboundChannelAdapter"
    channel="passwordResetTokenSubmitterJmsChannel"
    destination-name="passwordReset"
    connection-factory="jmsConnectionFactory" explicit-qos-enabled="true"
    delivery-persistent="false" session-transacted="false" />

Все работает хорошо, но я замечаю, что ActiveMQ, как производитель сообщений (для адаптера invoiceEventJmsOutboundChannelAdapter), накапливает много объектов в памяти и вызывает ошибки OutOfMemory в моем приложении. Мои сообщения могут занимать несколько килобайт, потому что их полезные данные представляют собой файлы XML, но, тем не менее, я не ожидаю, что в течение длительного времени будет храниться столько памяти.

Вот мои выводы о дампе кучи, созданном из-за самой последней ошибки OutOfMemory (с помощью Eclipse MAT для расследования). Два подозреваемых утечки найдены, и оба приводят к ConnectionStateTracker.

Вот один из двух аккумуляторов:

Class Name                                                                                                  | Shallow Heap | Retained Heap
-------------------------------------------------------------------------------------------------------------------------------------------
java.util.concurrent.ConcurrentHashMap$HashEntry[4] @ 0xe295da78                                            |           32 |    58.160.312
'- table java.util.concurrent.ConcurrentHashMap$Segment @ 0xe295da30                                        |           40 |    58.160.384
   '- [15] java.util.concurrent.ConcurrentHashMap$Segment[16] @ 0xe295d9e0                                  |           80 |    68.573.384
      '- segments java.util.concurrent.ConcurrentHashMap @ 0xe295d9b0                                       |           48 |    68.573.432
         '- sessions org.apache.activemq.state.ConnectionState @ 0xe295d7e0                                 |           40 |    68.575.312
            '- value java.util.concurrent.ConcurrentHashMap$HashEntry @ 0xe295d728                          |           32 |    68.575.344
               '- [1] java.util.concurrent.ConcurrentHashMap$HashEntry[2] @ 0xe295d710                      |           24 |    68.575.368
                  '- table java.util.concurrent.ConcurrentHashMap$Segment @ 0xe295d6c8                      |           40 |    68.575.440
                     '- [12] java.util.concurrent.ConcurrentHashMap$Segment[16] @ 0xe295d678                |           80 |    68.575.616
                        '- segments java.util.concurrent.ConcurrentHashMap @ 0xe295d648                     |           48 |    68.575.664
                           '- connectionStates org.apache.activemq.state.ConnectionStateTracker @ 0xe295d620|           40 |    68.575.808
-------------------------------------------------------------------------------------------------------------------------------------------

Как видите, экземпляр ConnectionStateTracker сохраняет около 70 МБ пространства кучи. Существует два экземпляра ConnectionStateTracker (один для каждого исходящего адаптера, я полагаю), которые сохраняют в общей сложности около 120 МБ кучи. Они накапливают его в двух экземплярах ConnectionState, которые, в свою очередь, имеют карту «сессий», содержащую общее количество 10 SessionState экземпляров, в которых, в свою очередь, ConcurrentHashMap производителей, имеющих общее количество в 1 258 ProducerState экземпляров. Они сохраняют те 120 МБ кучи в их поле transactionState, которое имеет тип TransactionState, которое, в свою очередь, имеет commands ArrayList, которое, похоже, сохраняет все сообщения, которые я отправляю.

Мой вопрос: почему ActiveMQ хранит в памяти уже отправленные сообщения? Есть также некоторые проблемы безопасности при хранении всех этих сообщений в памяти.

Ответы [ 3 ]

0 голосов
/ 13 марта 2019

Вот как я наконец решил это.

Я думаю, что главная проблема здесь была ошибка AMQ-6603 . Итак, первым делом мы обновились до ActiveMQ 5.15.8. Я думаю, этого было бы достаточно, чтобы исправить утечку.

Однако мы также немного изменили нашу конфигурацию, обнаружив, что использование фабрики пулов соединений с фабрикой слушателей не рекомендуется. Я думаю, что документация ActiveMQ сбивает с толку и что правильная конфигурация JMS сложнее, чем должна быть. В любом случае, если вы прочитаете документацию DefaultMessageListenerContainer, вы прочтете:

Не используйте org.springframework.jms.connection.CachingConnectionFactory Spring в сочетании с динамическим масштабированием. В идеале, вообще не используйте его с контейнером слушателя сообщений, так как обычно предпочтительно, чтобы сам контейнер слушателя обрабатывал соответствующее кэширование в течение своего жизненного цикла. Кроме того, остановка и перезапуск контейнера слушателя будет работать только с независимым локально кэшированным подключением, а не с внешним кэшированным.

То же самое должно применяться к ActiveMQ PooledConnectionFactory тогда. Однако документация ActiveMQ гласит:

Spring's MessagListenerContainer должен использоваться для потребления сообщений. Это обеспечивает всю мощь MDB - эффективное потребление JMS и пул прослушивателей сообщений - но без необходимости полного контейнера EJB.

Вы можете использовать activemq-pool org.apache.activemq.pool.PooledConnectionFactory для эффективного объединения подключений и сеансов для вашей группы потребителей, или вы можете использовать Spring JMS org.springframework.jms.connection.CachingConnectionFactory для достижения того же эффекта.

Итак, документация ActiveMQ предполагает обратное. Для этого я открыл ошибку AMQ-7140 . По этой причине я теперь внедряю PooledConnectionFactory (или Spring CachingConnectionFactory) только для клиентов JMS и обычную некэшируемую фабрику соединений ActiveMQ (созданную с <amq:connectionFactory>) при сборке Контейнерные фабрики слушателей либо с Spring <jms:listener-container>, либо с Spring Integration <int-jms:message-driven-channel-adapter>, и я скорее устанавливаю их атрибуты, связанные с параллелизмом и уровнем кэша.

Дополнительная трудность заключалась в том, что мы передаем локально созданный менеджер транзакций фабрикам-слушателям контейнеров, чтобы синхронизировать фиксацию сообщения JMS с фиксацией базы данных. Это действительно заставляет фабрику-приемник контейнера полностью отключать механизм кэширования соединений по умолчанию, если только вы не установили явный уровень кэширования (см. org.springframework.jms.listener.DefaultMessageListenerContainer.setCacheLevel(int) Javadoc).

Я заканчиваю этот ответ, говоря, что было трудно найти решение, особенно потому, что я не получил никакого отклика ни от одного разработчика ActiveMQ, ни в списке рассылки , ни в системе отслеживания проблем, даже если имхо это можно рассматривать как проблему безопасности. Это бездействие вместе с отсутствием альтернатив заставляет меня задуматься о том, стоит ли мне по-прежнему рассматривать JMS для моего следующего проекта.

0 голосов
/ 15 апреля 2019

TL; DR

Отключить anonymousProducers на вашем PooledConnectionFactory.


У нас была очень похожая проблема с тем, что службе не хватает памяти при использовании транзакций JMS и PooledConnectionFactory, настроенной максимум на 8 соединений.

Однако мы не использовали DefaultMessageListenerContainer или Spring, а отправляли только одному производителю.

Этот производитель отвечал за отправку большого количества сообщений из пакетного задания, и мы обнаружили, что при сбое соединения он оставляет эти сообщения на ConnectionStateTracker, подключенном к старому соединению. После нескольких отказов эти сообщения будут накапливаться на старых соединениях до точки, в которой у нас кончится куча.

Кажется, единственный способ очистить эти сообщения из памяти - закрыть производителя после совершения транзакции JMS. Это удаляет экземпляр ProducerState из SessionState в ConnectionStateTracker.

Вызов RemoveTransactionAction, который SimonD упомянул в своем ответе (что происходит автоматически после совершения транзакции JMS), просто удаляет TransactionState из ConnectionState, но все равно оставляет производителя и его сообщения на SessionState объект.

К сожалению вызов close() для производителя не работает "из коробки" с PooledConnectionFactory, так как по умолчанию он использует анонимных производителей - вызов метода close() для анонимного производителя не имеет эффект. Сначала вы должны вызвать setUseAnonymousProducers(false) на PooledConnectionFactory, чтобы закрытие продюсера имело какой-либо эффект.

Также стоит отметить, что вы должны вызывать close() для производителя - вызов close() в сеансе не приведет к закрытию производителя, несмотря на то, что предлагает JavaDoc для ActiveMQSession. Вместо этого он вызывает метод dispose() для производителя.

0 голосов
/ 12 марта 2019

Мы сталкиваемся с той же проблемой на сайте клиента.

Глядя на код ActiveMQ, в ConnectionStateTracker есть класс RemoveTransactionAction, который удаляет записи в ответ на сообщение OpenWire RemoveInfo (тип 12). Похоже, это сообщение генерируется ActiveMQ после получения сообщения.

...