Как управлять параллельным HTTP-запросом на основе списков очереди сообщений (NodeJS) - PullRequest
0 голосов
/ 23 февраля 2019

Я уверен, что такого рода проблемы решались здесь много раз, но я не могу найти, как эти вопросы были сформулированы.

У меня есть микро-сервисы, которые управляют связью между моей инфраструктурой и брокером MQTT.Каждый раз, когда получен HTTP-запрос, я отправляю сообщение «Кто жив в комнате XXX?».сообщение в MQTT Broker, и каждый клиент, зарегистрированный в теме «XXX / alive», должен ответить, и я жду Y миллисекунд, прежде чем закрыть запрос, отправив обратно полученные ответы клиенту.

Это хорошо работает, когдаЯ обрабатываю один запрос.Но он облажается, когда запрашивается более одного запроса одновременно.

Вот маршрут Express, обрабатывающий запросы HTTP:

app.get('/espPassports', (req, res) => {
  mqttHelper.getESPPassports(req.query.model_Name).then((passports) => {
    res.send(passports).end();
  }).catch(err => {
    res.send(err).end();
  })
})

Вот как работает getESPPassports:

getESPPassports: async (model_Name) => {
    return new Promise((resolve, reject) => {
      // Say there is a request performed
      ongoing_request.isOpen = true;
      ongoing_request.model_Name = model_Name;
      //  Ask who is alive
      con.publish(topic, "ASK");
      setTimeout(() => {
      // If no answer after given timeout
        if (ongoing_request.passports.length == 0) {
          reject({ error: "No MQTT passports found" });
      // Else send a deep clone of the answers (else it's empty)
        } else {
          resolve(JSON.parse(JSON.stringify(ongoing_request.passports)));
        }
      // Delete the current request object and 'close it'
        ongoing_request.passports.length = 0;
        ongoing_request.isOpen = false;
        ongoing_request.model_Name = ""
      }, process.env.mqtt_timeout || 2000)
    })
  }
};

А вот слушатель MQTT:

con.on("message", (topic, message) => {
      // If a passport is received check the topic and if there is a request opened
      if (_checkTopic(topic) && ongoing_request.isOpen) {
        try {
          ongoing_request.passports.push(JSON.parse(message));
        } catch (error) {
      // do stuff if error
        }
      }
  }
})

Я знаю, что проблема связана с логическим значением, которое я использую, чтобы указать, выполняется ли запрос в данный момент, я думал о создании объектадля каждого нового запроса и идентифицируйте их по уникальному идентификатору (например, по метке времени), но у меня нет способа заставить список MQTT узнать этот уникальный идентификатор.

У меня есть какое-то другое решение, но я не уверен, что они будут работать, и я чувствую, что есть способ справиться с этим, о котором я не знаю.

Хорошего дня.

1 Ответ

0 голосов
/ 23 февраля 2019

Вам необходимо сгенерировать уникальный идентификатор для каждого запроса и включить его в сообщение MQTT, затем вы можете кэшировать объект экспресс-ответа с ключом с уникальным идентификатором.

Устройства должны включать уникальный идентификатор вих ответы, чтобы их можно было сопоставить с правильным ответом.

Другой подход - просто кэшировать ответы от устройств и назначить кэшу время жизни, чтобы вам не приходилось каждый раз запрашивать устройства.

...