Причина: Брокер: localhost - Клиент: tata_consumer уже подключен с tcp: //127.0.0.1: 52917 - PullRequest
0 голосов
/ 03 мая 2020

Я новичок в ActiveMQ и пытаюсь поэкспериментировать с работой тем. Я создал Producer с использованием JMS и двух прослушивателей Consumer, настроенных с помощью Spring. Я знаю, что если используется несколько потребителей, то каждый потребитель должен иметь уникальный идентификатор клиента. Даже после сохранения уникального идентификатора клиента, я получаю ту же ошибку.

ERROR | Could not refresh JMS Connection for destination 'hetal_rachh_topic' - retrying using FixedBackOff{interval=5000, currentAttempts=4, maxAttempts=unlimited}. Cause: Broker: localhost - Client: tata_consumer already connected from tcp://127.0.0.1:52917
ERROR | Could not refresh JMS Connection for destination 'hetal_rachh_topic' - retrying using FixedBackOff{interval=5000, currentAttempts=4, maxAttempts=unlimited}. Cause: Broker: localhost - Client: cipla_consumer already connected from tcp://127.0.0.1:52918

Примечание: я даже пытался создать только один контейнер слушателя JMS, и все же ошибка говорит о том, что конкретный клиент уже подключен.

Шаги того, что я пытаюсь сделать -

  1. Создан проект maven с контроллером REST, имеющим два API. Первый API просто инициализирует массив. И второй API вызывает производителя, который отправляет сообщение в topi c с использованием JMSTemplate.
  2. Настроил только один JMS-слушатель в весеннем контекстном файле
  3. Код пользовательского интерфейса использует ax ios для создания вызов первого API, который инициализирует некоторый массив данных. И затем через каждый интервал вызывает второй API, который вызывает производителя для отправки сообщений, которые в дальнейшем используются слушателями. Контроллер (второй API) в основном возвращает строку JSON сообщений, полученных обратно в пользовательский интерфейс.
  4. Я не использую какой-либо веб-сервер локально, но подключаемый модуль tomcat7 для запуска моего весеннего проекта mvn tomcat7:run. Я запускаю локальный сервер AMQ activemq start и запускаю свой код пользовательского интерфейса, используя npm start

Ниже приведен мой ReactJs фрагмент кода пользовательского интерфейса, который делает вызовы ios.

componentDidMount() {
    this.callCreateConnection();
    this.interval = setInterval(this.callStockUpdatesAPI, 15000);
  }

  componentWillUnmount() {
    clearInterval(this.interval);
  }

  async callCreateConnection() {
    await axios
      .get("http://localhost:8080/stock-market-ticker/createConnection", {
        headers: {
          "Access-Control-Allow-Origin": "*",
          "Access-Control-Allow-Methods": "GET",
        },
      })
      .then((res) => {
        console.log("Response", res.data);
      })
      .catch((error) => {
        console.log("Error", error);
      });
  }

  async callStockUpdatesAPI() {
    console.log("Inside api");
    await axios
      .get("http://localhost:8080/stock-market-ticker/getStockUpdates", {
        headers: {
          "Access-Control-Allow-Origin": "*",
          "Access-Control-Allow-Methods": "GET",
        },
      })
      .then((response) => {
        console.log("Response", response.data);
        switch (response.data.symbol) {
          case "TATAMOTORS":
            let oldData_tata = { ...this.state.data };

            oldData_tata[0].price = response.data.price;
            oldData_tata[0].percent_change_1h = response.data.change_1hr;
            oldData_tata[0].percent_change_24h = response.data.change_24hr;
            this.setState({ data: oldData_tata });

            break;
          case "CIPLA":
            let oldData_cipla = { ...this.state.data };
            oldData_cipla[0].price = response.data.price;
            oldData_cipla[0].percent_change_1h = response.data.change_1hr;
            oldData_cipla[0].percent_change_24h = response.data.change_24hr;
            this.setState({ data: oldData_cipla });

            break;
          case "ASIANPAINT":
            let oldData_asian = { ...this.state.data };
            oldData_asian[1].price = response.data.price;
            oldData_asian[1].percent_change_1h = response.data.change_1hr;
            oldData_asian[1].percent_change_24h = response.data.change_24hr;
            this.setState({ data: oldData_asian });

            break;
          default:
            return;
        }
      })
      .catch((error) => {
        console.log("Error", error);
      });
 }

Я поделился кодом приложения Spring на https://github.com/hetalrachh/stock-market-ticker.

Как я могу это исправить? Если кто-нибудь может помочь. Заранее спасибо.

1 Ответ

0 голосов
/ 03 мая 2020

Я считаю, что проблема в том, что каждый Spring listener-container создает несколько слушателей, и каждый из этих нескольких слушателей использует одно и то же значение client-id из своего родительского контейнера. Это не разрешено спецификацией JMS. Попробуйте использовать concurrency="1" на элементах listener-container, например:

    <jms:listener-container acknowledge="auto"
        connection-factory="connectionFactory" destination-type="durableTopic"
        client-id="tata_consumer" concurrency="1">
        <jms:listener destination="hetal_rachh_topic" ref="tataConsumer"
            method="onMessage" subscription="subscription" selector="symbol=TATAMOTORS" />
    </jms:listener-container>

    <jms:listener-container acknowledge="auto"
        connection-factory="connectionFactory" destination-type="durableTopic"
        client-id="cipla_consumer" concurrency="1">
        <jms:listener destination="hetal_rachh_topic" ref="ciplaConsumer"
            method="onMessage" subscription="subscription" selector="symbol=CIPLA" />
    </jms:listener-container>

...