Я работаю над производственной проблемой с проблемой HornetQ Queue.
Ниже приведен стек для моего приложения
Wildfly 9.0.2
HornetQ 2.4.7
Spring 4.3
Я использую соединение netty для hornetq, в wildfly, следующая конфигурация в
standalone-full.xml
<connection-factory name="RemoteConnectionFactory">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="RemoteConnectionFactory"/>
<entry name="java:jboss/exported/RemoteConnectionFactory"/>
</entries>
<block-on-non-durable-send>false</block-on-non-durable-send>
<block-on-durable-send>false</block-on-durable-send>
<consumer-window-size>0</consumer-window-size>
</connection-factory>
Конфигурация пружины выглядит следующим образом.
<bean id = "atfQueueMDP" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name = "concurrentConsumers" value = "20"/>
<property name = "connectionFactory" ref = "connectionFactory"/>
<property name = "destination" ref = "atfQueue"/>
<property name = "messageListener" ref = "messageListener"/>
<property name = "sessionAcknowledgeMode" value = "2"/>
</bean>
<bean id="connectionFactory" class="org.hornetq.jms.client.HornetQJMSConnectionFactory">
<constructor-arg value="false"/>
<constructor-arg>
<bean name="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration">
<constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
<constructor-arg>
<map key-type="java.lang.String" value-type="java.lang.Object">
<entry key="host" value="localhost" />
<entry key="port" value="5445" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<bean id = "ATFJmsTemplate" class = "org.springframework.jms.core.JmsTemplate">
<property name = "connectionFactory">
<ref bean = "connectionFactory"/>
</property>
<property name = "sessionAcknowledgeMode" value = "2"/>
<property name = "receiveTimeout">
<value>-1</value>
</property>
</bean>
<bean id = "atfQueue" class = "org.springframework.jndi.JndiObjectFactoryBean" >
<property name = "jndiTemplate">
<ref bean = "jndiTemplate"/>
</property>
<property name = "jndiName">
<value>queue/ATFQueue</value>
</property>
</bean>
<bean id = "messageListener" class = "com.example.jms.receiver.ATFReceiver" />
Теперь проблема заключается в следующем.
Я указал параллельного потребителя как 20, так что, согласно моему пониманию, мой слушатель может обработать 20 одновременных сообщений.
Ошибка при выполнении следующего условия
- Первое сообщениеполучен клиентом и занимает около 5 минут для завершения
- Следующие 19 сообщений отправляются и принимаются оставшимися 19 потребителями, затем все это потребляется каждым потребителем, все они завершаются в течение нескольких секунд, скажем, (10секунд)
- Теперь, если я отправлю 1 сообщение, но этот шне будет доставлено клиенту, так как все потребители заняты, и он попадает в очередь, но после завершения выше 19-го сообщения все потребители свободны, , но все еще последние сообщения находятся в очереди (не доставляются потребителю).Это сообщение будет доставлено только после того, как будет завершено первое сообщение (из 5 минут).
- После завершения 19-го сообщения, если я отправлю новое сообщение, оно будет немедленно доставлено потребителю, но выше одно сообщение все еще находится в очереди.
Может кто-нибудь подсказать, пожалуйста, как решить эту проблему, чтобы, если какой-либо потребитель освободился, он немедленно получил следующее сообщение.
Я подготовил проект черезкоторый вы можете воспроизвести этот сценарий. ProjectDemo
Заранее спасибо