Как убить потоки "ActiveMQ Transport" после прерывания JMS MessageConsumer.receive () - PullRequest
3 голосов
/ 16 июля 2011

У меня есть класс, который получает подключение ActiveMQ, используя ActiveMQConnectionFactory.createConnection (). Затем он создает и владеет сеансом, получателем и получателем (очередью), которые прослушивают этот получатель.

Я вызываю receive () или receive (millis) для получателя, чтобы получить мои сообщения из очереди. В определенных сценариях мне приходится убивать (прерывать) поток, в котором вызывается метод приема. Я пытаюсь закрыть сессию и соединение сразу после этого. К сожалению, я постоянно получаю исключение при вызове close и связанные потоки "ActiveMQ Transport" (и связанные с ним соединения с брокером) остаются в живых. Я получаю исключение

org.myorg.awsiface.communication.MessagingException: Failed to close JMS connection
at
org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:253)
at
org.myorg.aws.communication.protocol.ContextFinishedProtocol.cleanUp(ContextFinishedProtocol.java:94)
at org.myorg.myservice.job.Job.run(Job.java:206)
Caused by: javax.jms.JMSException: java.io.InterruptedIOExceptio)
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1227)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1799)
at org.apache.activemq.ActiveMQMessageConsumer.doClose(ActiveMQMessageConsumer.java:636)
at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:627)
at org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:232)
... 2 more
Caused by: java.io.InterruptedIOException
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:102)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
... 7 more

Я пробовал следующие URL-адреса подключения отказоустойчивый: // ТСР: //broker.ip.address: 61616 ТСР: //broker.ip.address: 61616 отказоустойчивый: // ТСР: //broker.ip.address: 61616 след = истина & closeAsync = ложь

Также я попытался использовать вызов MessageConsumer :: receiveNoWait () и просто выполнить свой собственный Thread.sleep, но я всегда получаю исключение выше, когда вызываю следующее, чтобы закрыть соединение

try {
    // I added the following two lines to see if the taskRunner was handling the threads - still no luck
    TaskRunnerFactory taskRunner = ((ActiveMQConnection)connection).getSessionTaskRunner();
    taskRunner.shutdown();

    if (producer != null) {
        producer.close();
    }
    if (consumer != null) {
        consumer.close();
    }
    session.close();
    if (connection instanceof ActiveMQConnection) {
        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
        amqConnection.stop();
    }           
    connection.stop();
    connection.close();
}
catch (ConnectionClosedException e) {
    // NOOP - this is ok
}
catch (Exception e) {
    throw new MessagingException("Failed to close JMS connection", e, log);
}

Ответы [ 2 ]

1 голос
/ 01 апреля 2016

Одним из решений может быть установка флага демона в транспортном потоке TCP ActiveMQ, например tcp://URI:PORT?daemon=true

Ознакомьтесь с документацией ActiveMQ: http://activemq.apache.org/tcp-transport-reference.html

1 голос
/ 13 сентября 2013

Я не уверен, почему закрытие не работает для вас.Но вы можете достичь желаемого с помощью асинхронного прослушивателя сообщений.

Connection connection = connectionFactory.createConnection();
connection.start();

Session session = 
    connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination queue = session.createQueue("queueName");

Consumer consumer = session.createConsumer( queue );

consumer.setMessageListener( new MessageListener()
{
    public void onMessage( Message message )
    { 
         // whatever was after the receive() method call
    }
} );

Тогда прерывать не нужно.Когда вы закончите, закройте вещи:

consumer.close();
session.close();
queue.close();
connection.close();
...