Как получить все очереди и темы от утешения - PullRequest
0 голосов
/ 08 января 2019

Я хочу узнать все направления из утешения (очереди и темы)

Я попытался использовать MBeanServerConnection и запросить имена (но я не нашел подходящего способа использовать это) или поиск JNDI. Destination dest = (Destination) context.lookup (Dest_name), но у меня нет имен очереди / темы. Я использую утешение - JMS библиотеки.

Я ищу что-то вроде этого: (но для утешения, а не activeMq) получить всю очередь из activeMQ

Ответы [ 3 ]

0 голосов
/ 19 января 2019

Вот некоторый исходный код, который может помочь. При правильно настроенном устройстве SEMP также доступен через JMS по теме "# SEMP / (router) / SHOW".

  /**
   * Return the SolTopicInfo for this topic (or all topics if 'topic' is null).
   * 
   * @param session
   * @param endpointName
   * @return
   */
  public static SolTopicInfo[] getTopicInfo(JCSMPSession session, String endpointName, String vpn,
      String sempVersion) {
    XMLMessageConsumer cons = null;
    XMLMessageProducer prod = null;
    Map<String, SolTopicInfo> tiMap = new HashMap<String, SolTopicInfo>();
    try {
      // Create a producer and a consumer, and connect to appliance.
      prod = session.getMessageProducer(new PubCallback());
      cons = session.getMessageConsumer(new SubCallback());
      cons.start();

      if (vpn == null) vpn = (String) session.getProperty(JCSMPProperties.VPN_NAME);
      if (sempVersion == null) sempVersion = getSempVersion(session);

      // Extract the router name.
      final String SEMP_SHOW_TE_TOPICS = "<rpc semp-version=\""
          + sempVersion
          + "\"><show><topic-endpoint><name>"
          + endpointName
          + "</name><vpn-name>"+ vpn + "</vpn-name></topic-endpoint></show></rpc>";

      RpcReply teTopics =  sendRequest(session, SEMP_SHOW_TE_TOPICS);

      for (TopicEndpoint2 te : teTopics.getRpc().getShow().getTopicEndpoint().getTopicEndpoints()
          .getTopicEndpointArray()) {
        SolTopicInfo ti = new SolTopicInfo();
        ti.setBindCount(te.getInfo().getBindCount());
        //qi.setDescription(qt.getInfo().getNetworkTopic());
        ti.setEndpoint(te.getName());
        ti.setMessageVPN(te.getInfo().getMessageVpn());
        ti.setTopic(te.getInfo().getDestination());
        ti.setDurable(te.getInfo().getDurable());
        ti.setInSelPres(te.getInfo().getIngressSelectorPresent());
        ti.setHwmMB(formatter.format(te.getInfo().getHighWaterMarkInMb()));
        ti.setSpoolUsageMB(formatter.format(te.getInfo().getCurrentSpoolUsageInMb()));
        ti.setMessagesSpooled(te.getInfo().getNumMessagesSpooled().longValue());
        String status = te.getInfo().getIngressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + te.getInfo().getEgressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + te.getInfo().getIngressSelectorPresent().substring(0, 1).toUpperCase();
        status += " " + te.getInfo().getType().substring(0, 1).toUpperCase();
        ti.setStatus(status);

        tiMap.put(ti.getEndpoint(), ti);
      }

    } catch (JCSMPException e) {

      throw new RuntimeException(e.getMessage(), e);
    } finally {
      if (cons != null)
        cons.close();
      if (prod != null)
        prod.close();
    }
    return tiMap.values().toArray(new SolTopicInfo[0]);
  }

  /**
   * Return the SolQueueInfo for this queue (or all queues if 'queue' is null).
   * 
   * @param session
   * @param queue
   * @param vpn (if null, use the session's vpn name)
   * @param sempVersion, if null use 'soltr/7_1_1'
   * @return
   */
  public static SolQueueInfo[] getQueueInfo(JCSMPSession session, String queue, String vpn,
      String sempVersion) {
    XMLMessageConsumer cons = null;
    XMLMessageProducer prod = null;
    Map<String, SolQueueInfo> qiMap = new HashMap<String, SolQueueInfo>();
    try {
      // Create a producer and a consumer, and connect to appliance.
      prod = session.getMessageProducer(new PubCallback());
      cons = session.getMessageConsumer(new SubCallback());
      cons.start();

      if (vpn == null) vpn = (String) session.getProperty(JCSMPProperties.VPN_NAME);
      if (sempVersion == null) sempVersion = getSempVersion(session);

      // Extract the router name.

      final String SEMP_SHOW_QUEUE_SUBS = "<rpc semp-version=\""
          + sempVersion
          + "\"><show><queue><name>"
          + queue
          + "</name><vpn-name>"+ vpn + "</vpn-name><subscriptions/><count/><num-elements>200</num-elements></queue></show></rpc>";

      RpcReply queueSubs = sendRequest(session, SEMP_SHOW_QUEUE_SUBS);

      for (QueueType qt : queueSubs.getRpc().getShow().getQueue().getQueues().getQueueArray()) {
        SolQueueInfo qi = new SolQueueInfo();
        qi.setBindCount(qt.getInfo().getBindCount());
        //qi.setDescription(qt.getInfo().getNetworkTopic());
        qi.setName(qt.getName());
        qi.setMessageVPN(qt.getInfo().getMessageVpn());
        qi.setDurable(qt.getInfo().getDurable());
        qi.setEgSelPres(qt.getInfo().getEgressSelectorPresent());
        qi.setHwmMB(formatter.format(qt.getInfo().getHighWaterMarkInMb()));
        qi.setMessagesSpooled(qt.getInfo().getNumMessagesSpooled().longValue());
        qi.setSpoolUsageMB(formatter.format(qt.getInfo().getCurrentSpoolUsageInMb()));
        String status = qt.getInfo().getIngressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getEgressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getAccessType().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getEgressSelectorPresent().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getType().substring(0, 1).toUpperCase();
        status += qt.getInfo().getDurable() ? " D" : " N";
        qi.setStatus(status);

        for (Subscription sub : qt.getSubscriptions().getSubscriptionArray()) {
          qi.addSubscription(sub.getTopic());
        }

        qiMap.put(qi.getName(), qi);
      }

    } catch (JCSMPException e) {

      throw new RuntimeException(e.getMessage(), e);
    } finally {
      if (cons != null)
        cons.close();
      if (prod != null)
        prod.close();
    }
    return qiMap.values().toArray(new SolQueueInfo[0]);
  }

  private static String getSempVersion(JCSMPSession session)
  {
    String retval = "soltr/7_1_1";
    try {
      String peerVersion = (String)session.getCapability(CapabilityType.PEER_SOFTWARE_VERSION);
      if (peerVersion != null)
      {
        retval = "soltr/";
        String[] version = peerVersion.split("\\.");
        retval += version[0];
        retval += "_" + version[1];
        if (!version[2].equals("0")) retval += "_" + version[2];
      }
    } catch (Throwable e) {
      System.err.println(e);
    }
    return retval;
  }

  private static RpcReply sendRequest(JCSMPSession session,
      final String requestStr)  {
    try {
      // Set up the requestor and request message.
      String routerName = (String) session
          .getCapability(CapabilityType.PEER_ROUTER_NAME);

      final String SEMP_TOPIC_STRING = String.format("#SEMP/%s/SHOW",
          routerName);
      final Topic SEMP_TOPIC = JCSMPFactory.onlyInstance().createTopic(
          SEMP_TOPIC_STRING);
      Requestor requestor = session.createRequestor();
      BytesXMLMessage requestMsg = JCSMPFactory.onlyInstance().createMessage(
          BytesXMLMessage.class);
      requestMsg.writeAttachment(requestStr.getBytes());

      BytesXMLMessage replyMsg = requestor
          .request(requestMsg, 5000, SEMP_TOPIC);

      String replyStr = new String();
      if (replyMsg.getAttachmentContentLength() > 0) {
        byte[] bytes = new byte[replyMsg.getAttachmentContentLength()];
        replyMsg.readAttachmentBytes(bytes);
        replyStr = new String(bytes, "US-ASCII");
      }

      RpcReplyDocument doc = RpcReplyDocument.Factory.parse(replyStr);

      RpcReply reply = doc.getRpcReply();

      if (reply.isSetPermissionError()) {
        throw new RuntimeException(
            "Permission Error: Make sure SEMP over message bus SHOW commands are enabled for this VPN");
      }

      if( reply.isSetParseError() ) {
        throw new RuntimeException( "SEMP Parse Error: " + reply.getParseError() );
      }

      if( reply.isSetLimitError() ) {
        throw new RuntimeException( "SEMP Limit Error: " + reply.getLimitError() );
      }

      if( reply.isSetExecuteResult() && reply.getExecuteResult().isSetReason() ) { // axelp: encountered this error on invalid 'queue' name
        throw new RuntimeException( "SEMP Execution Error: " + reply.getExecuteResult().getReason() );
      }

      return reply;
    } catch (JCSMPException e) {

      throw new RuntimeException(e.getMessage(), e);
    } catch (UnsupportedEncodingException e) {

      throw new RuntimeException(e.getMessage(), e);
    } catch (XmlException e) {

      throw new RuntimeException(e.getMessage(), e);
    }
  }
0 голосов
/ 21 мая 2019

Вы можете получить сообщения VPN для определенных очередей и тем, используя следующую команду SEMPv2.

curl -s -X GET -u semp_user:semp_pass management_host:management_port/SEMP/v2/monitor/msgVpns/{vpn-name}/queues?select="queueName"

curl -s -X GET -u semp_user:semp_pass management_host:management_port/SEMP/v2/monitor/msgVpns/{vpn-name}/topicEndpoints?select="topicEndpointName"
0 голосов
/ 10 января 2019

Для этого вам потребуется использовать SEMP через интерфейс управления.

Примеры команд:

curl -d '<rpc><show><queue><name>*</name></queue></show></rpc>' -u semp_username:semp_password http://your_management_ip:your_management_port/SEMP
curl -d '<rpc><show><topic-endpoint><name>*</name></topic-endpoint></show></rpc>' -u semp_username:semp_password http://your_management_ip:your_management_port/SEMP

Обратите внимание, что для простоты я использую curl, но любое приложение может выполнять HTTP POST для выполнения этих команд. Если вы используете Java, вы можете обратиться к образцу SempHttpSetRequest, найденному в примерах API Solace.

Документацию по SEMP можно найти здесь .


Тем не менее, главный вопрос здесь - зачем вам открывать все пункты назначения?

Одной из особенностей брокера сообщений является разделение издателей и потребителей.

Если вам нужно знать, публикуется ли ваше постоянное сообщение в теме без потребителей, вы можете использовать параметр reject-msg-to-sender-on-no-subscription-match в клиенте приложения публикации. профиль. Это означает, что издатель получит отрицательное подтверждение в случае, если он попытается опубликовать сообщение по теме, у которой нет подходящих подписчиков.

Подробнее см. В разделе «Обработка гарантированных сообщений без совпадений» в https://docs.solace.com/Configuring-and-Managing/Configuring-Client-Profiles.htm.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...