ActiveMQ Прослушиватель соединения с получателем / производителем - PullRequest
4 голосов
/ 09 ноября 2009

Кажется, я не могу найти способ прослушивания новых подключений производителя и потребителя (или прерываний соединения) в ActiveMQ (версия Java). Я хочу быть в состоянии сказать потребителям (или они могут узнать сами), что связь производителя разорвалась. Требуется и обратный путь (производитель узнает, что определенный потребитель отключен).

Буду признателен за помощь.

Ответы [ 2 ]

4 голосов
/ 10 ноября 2009

Я думаю, что вы хотите слушать новых производителей и потребителей в определенном месте назначения (определенной очереди или теме). Это верно?

Вы можете создавать экземпляры ConsumerEventSource и ProducerEventSource и регистрировать своих собственных слушателей, вызывая их setConsumerListener и setProducerListener для них соответственно.

Итак:

Connection conn = yourconnection; // the connection your listener will use
Destination dest = yourdestination; // the destination you're paying attention to
ConsumerEventSource source = new ConsumerEventSource(conn, dest);
source.setConsumerListener(new ConsumerListener() {

   public void onConsumerEvent(ConsumerEvent event) {
      if (event.isStarted()) {
         System.out.println("a new consumer has started - " + event.getConsumerId());
      } else {
         System.out.println("a consumer has dropped - " + event.getConsumerId());
      }
   }

});

Если вы посмотрите на код для ConsumerEventSource или ProducerEventSource, вы увидите, что это простые объекты, использующие методы AdvisorySupport для прослушивания специальной консультативной темы, целью которой является трансляция новостей о производителях и потребителях. Вы можете узнать больше, прочитав исходный код этих классов.

Ваше использование «соединения» потенциально является проблемой; в земле ActiveMQ (которая является подмножеством земли JMS) «Соединение» - это объект более низкого уровня, который не связан с конкретным пунктом назначения. Определенный клиент создает «Сеанс» из Соединения - все еще не специфичный для пункта назначения - и затем создает для определенного пункта назначения QueueSender, QueueReceiver, TopicPublisher или TopicSubscriber. Когда они созданы или когда сеансы, которые их создали, умирают, это те события, о которых вы хотите услышать, и о которых вы услышите, если будете использовать код выше.

2 голосов
/ 12 ноября 2009

Вся необходимая информация публикуется в темах ActiveMQ Advisory, таких как «ActiveMQ.Advisory.Connection» или просто «ActiveMQ.Advisory ..>» для всех из них. Даже события, происходящие в Stomp Connection, публикуются в разделах с рекомендациями ActiveMQ. Следующий код дает пример этого (протестировано с клиентом Flex, подключенным через Stomp):

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("user", "password", ActiveMQConnection.DEFAULT_BROKER_URL);
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, ackMode);
connection.start();
Destination destinationAdvisory = session.createTopic("ActiveMQ.Advisory..>");
MessageConsumer consumerAdvisory = session.createConsumer(destinationAdvisory);
consumerAdvisory.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {

            if (message instanceof ActiveMQMessage) {
                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                Object command = activeMessage.getDataStructure();
                if (command instanceof ConsumerInfo) {
                    System.out.println("A consumer subscribed to a topic or queue: " + command);
                } else if (command instanceof RemoveInfo) {
                    RemoveInfo removeInfo = (RemoveInfo) command;
                    if (removeInfo.isConsumerRemove()) {
                        System.out.println("A consumer unsubscribed from a topic or queue");
                    }
                    else {
                        System.out.println("RemoveInfo, a connection was closed: " + command);
                    }
                } else if (command instanceof ConnectionInfo) {
                    System.out.println("ConnectionInfo, a new connection was made: " + command);
                } else {
                    System.out.println("Unknown command: " + command);
                }
            }
    }
});
...