QPID - Spring CachingConnectionFactory - переподключение - PullRequest
7 голосов
/ 23 сентября 2011

Конфигурация пружины

 <bean id="jmsQueueConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
    <constructor-arg index="0"
        value="amqp://guest:guest@localhost/test?brokerlist='tcp://localhost:5672'" />
</bean>

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsQueueConnectionFactory" />
    <property name="sessionCacheSize" value="1" />
    <property name="reconnectOnException" value="true" />
</bean>

<bean id="myDestination" class="org.apache.qpid.client.AMQAnyDestination">
    <constructor-arg index="0" value="ADDR:myqueue; {create: always}" />
</bean>

<bean id="myServiceBean" class="com.test.MyService" />

<bean id="myContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachingConnectionFactory" />
    <property name="exceptionListener" ref="cachingConnectionFactory" /> 
    <property name="messageListener" ref="myServiceBean" />
    <property name="concurrentConsumers" value="1" />
    <property name="autoStartup" value="true" />
    <property name="destination" ref="myDestination" />
    <property name="recoveryInterval" value="10000" />      
</bean>

MyService.java

public class MyService implements MessageListener {
    public void onMessage(Message msg) {        
    log.info("----On Message called :"+msg+", :"+msg.getClass().getName());         
    }
}

Когда я перезапускаю QPID

БезReconnectOnException = true, я получаю это исключение, но не повторного подключения

3203 [myContainer-1] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS Session for mode 1: org.apache.qpid.client.AMQSession_0_10@1d03a4e
3312 [myContainer-1] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS MessageConsumer for destination ['myqueue'/None; {
  'create': 'always'
}]: org.apache.qpid.client.BasicMessageConsumer_0_10@8a2023
99109 [myContainer-1] WARN org.springframework.jms.listener.DefaultMessageListenerContainer  - Setup of JMS message listener invoker failed for destination ''myqueue'/None; {
  'create': 'always'
}' - trying to recover. Cause: timed out waiting for session to become open (state=DETACHED)
org.apache.qpid.transport.SessionException: timed out waiting for session to become open (state=DETACHED)
    at org.apache.qpid.transport.Session.invoke(Session.java:630)
    at org.apache.qpid.transport.Session.invoke(Session.java:559)
    at org.apache.qpid.transport.SessionInvoker.executionSync(SessionInvoker.java:84)
    at org.apache.qpid.transport.Session.sync(Session.java:782)
    at org.apache.qpid.transport.Session.sync(Session.java:770)
    at org.apache.qpid.client.BasicMessageConsumer_0_10.getMessageFromQueue(BasicMessageConsumer_0_10.java:423)
    at org.apache.qpid.client.BasicMessageConsumer.receive(BasicMessageConsumer.java:407)
    at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:74)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:429)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
    at java.lang.Thread.run(Thread.java:619)
99109 [myContainer-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection

..

281125 [myContainer-3] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection


С reconnectOnException = true, подключение и отключение

    13015 [IoReceiver - localhost/127.0.0.1:5672] WARN org.springframework.jms.connection.CachingConnectionFactory  - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: connection aborted
    at org.apache.qpid.client.AMQConnectionDelegate_0_10.closed(AMQConnectionDelegate_0_10.java:303)
    at org.apache.qpid.transport.Connection.closed(Connection.java:568)
    at org.apache.qpid.transport.network.Assembler.closed(Assembler.java:110)
    at org.apache.qpid.transport.network.InputHandler.closed(InputHandler.java:202)
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:150)
    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.transport.ConnectionException: connection aborted
    at org.apache.qpid.transport.Connection.closed(Connection.java:541)
    ... 4 more
13031 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Closing shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672
Virtual Host: test
Client ID: localhost
Active session count: 1
73031 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Could not close shared JMS Connection
org.apache.qpid.client.JMSAMQException: timed out waiting for session to become open (state=DETACHED)
    at org.apache.qpid.client.AMQConnection.stop(AMQConnection.java:824)
    at org.springframework.jms.connection.SingleConnectionFactory.closeConnection(SingleConnectionFactory.java:422)
    at org.springframework.jms.connection.SingleConnectionFactory.resetConnection(SingleConnectionFactory.java:321)
    at org.springframework.jms.connection.CachingConnectionFactory.resetConnection(CachingConnectionFactory.java:197)
    at org.springframework.jms.connection.SingleConnectionFactory.onException(SingleConnectionFactory.java:302)
    at org.springframework.jms.connection.ChainedExceptionListener.onException(ChainedExceptionListener.java:57)
    at org.apache.qpid.client.AMQConnectionDelegate_0_10.closed(AMQConnectionDelegate_0_10.java:306)
    ...
    at org.apache.qpid.transport.network.io.IoReceiver.run(IoReceiver.java:150)
    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.AMQException: timed out waiting for session to become open (state=DETACHED) [error code 541: internal error]
    at org.apache.qpid.client.AMQSession_0_10.setCurrentException(AMQSession_0_10.java:1050)
    at org.apache.qpid.client.AMQSession_0_10.sync(AMQSession_0_10.java:1030)
    at org.apache.qpid.client.AMQSession_0_10.sendSuspendChannel(AMQSession_0_10.java:857)
    at org.apache.qpid.client.AMQSession.suspendChannel(AMQSession.java:3006)
    at org.apache.qpid.client.AMQSession.stop(AMQSession.java:2341)
    at org.apache.qpid.client.AMQConnection.stop(AMQConnection.java:820)
    ... 11 more

104875 [myContainer-1] INFO org.springframework.jms.connection.CachingConnectionFactory  - Established shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672
Virtual Host: test
Client ID: localhost
Active session count: 0
104875 [myContainer-1] INFO org.springframework.jms.listener.DefaultMessageListenerContainer  - Successfully refreshed JMS Connection
104875 [myContainer-2] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Creating cached JMS Session for mode 1: org.apache.qpid.client.AMQSession_0_10@191e4c
104937 [IoReceiver - localhost/127.0.0.1:5672] WARN org.springframework.jms.connection.CachingConnectionFactory  - Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: 404
    at org.apache.qpid.client.AMQConnection.exceptionReceived(AMQConnection.java:1230)

    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.qpid.AMQException: ch=0 id=0 ExecutionException(errorCode=NOT_FOUND, commandId=0, description=Queue: myqueue not found) [error code 404: not found]
    at org.apache.qpid.client.AMQSession_0_10.setCurrentException(AMQSession_0_10.java:1050)
    ... 29 more
104937 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.springframework.jms.connection.CachingConnectionFactory  - Closing shared JMS Connection: AMQConnection:
Host: localhost
Port: 5672


1 Ответ

0 голосов
/ 06 марта 2014

Вы пытались пропустить параметр exceptionListener на DMLC?Я не припомню необходимости указывать это, и кажется, что ваше соединение становится недействительным после того, как восстановление соединения было инициировано.

Кроме того, если вы не используете JBoss 4, вы можете попробовать использовать DMLCвстроенный механизм кэширования (через настройку cacheLevel) вместо использования фабрики соединений для кэширования для ваших потребителей;Вы даже можете получить более высокую производительность, поскольку DMLC может кэшировать сеансы и потребителей, а также соединения.

...