Как заставить отключиться от ActiveMQ соединения с Stomp.py - PullRequest
1 голос
/ 06 января 2020

При прослушивании очереди сообщений с использованием надежного соединения я получаю сообщение об ошибке в слушателе. Я имитирую это, нажимая CTRL - Z , чтобы выйти из программы. Попытка переподключения дает мне сообщение об ошибке:

on_error! : "javax.jms.InvalidClientIDException: Broker: BMRSBROKER - Client: <Client-id> already connected from tcp://10.18.57.69:4241
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:116)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.security.JaasAuthenticationBroker.addConnection(JaasAuthenticationBroker.java:75)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker.addConnection(AbstractRuntimeConfigurationBroker.java:118)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:103)
        at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
        at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:333)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:197)
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300)
        at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
        at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:774)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:265)
        at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:108)
        at org.apache.activemq.transport.stomp.StompSslTransportFactory$1$1.doConsume(StompSslTransportFactory.java:70)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        at java.lang.Thread.run(Thread.java:745)
"

Я попытался отписаться и отключить соединение, используя метод, описанный ниже, но это не отсоединит меня.

class MyListener(stomp.ConnectionListener):
    """This is a listener class that listens for new messages using the STOMP protocol"""

    def __init__(self, conn):
        self.conn = conn
        self.client_id = client_id

    def on_error(self, headers, message):
        self.disconnect()

    def on_message(self, headers, message):
        ...

    def on_disconnected(self):
        self.disconnect()

    def disconnect(self):
        try:
            self.conn.unsubscribe(
                destination="/topic/bmrsTopic",
                id=self.client_id
            )
        except:
            print('unsubscribe failed')
        # first disconnect before trying to reconnect
        print('first disconnect before trying to reconnect')
        try:
            self.conn.disconnect()
        except:
            print('disconnect failed')

Как заставить сервер AMQ забыть мое предыдущее соединение?

1 Ответ

0 голосов
/ 07 января 2020

Посредник не может отключить другие ресурсы клиента от другого соединения. Вместо этого вам следует настроить соединение со значением времени простоя как для клиента, так и для брокера, чтобы обе стороны обрабатывали удаленное отбрасывание без того, чтобы сокет не обнаружил закрытие.

Можно также настроить транспортный соединитель посредника со значениями периода отсрочки сердцебиения для клиентов Stomp, которые не объявляют сердцебиения, см. Документацию STOMP брокера ActiveMQ .

Спецификация STOMP описывает, как работает сердцебиение:

...