Я уверен, что такого рода проблемы решались здесь много раз, но я не могу найти, как эти вопросы были сформулированы.
У меня есть микро-сервисы, которые управляют связью между моей инфраструктурой и брокером MQTT.Каждый раз, когда получен HTTP-запрос, я отправляю сообщение «Кто жив в комнате XXX?».сообщение в MQTT Broker, и каждый клиент, зарегистрированный в теме «XXX / alive», должен ответить, и я жду Y миллисекунд, прежде чем закрыть запрос, отправив обратно полученные ответы клиенту.
Это хорошо работает, когдаЯ обрабатываю один запрос.Но он облажается, когда запрашивается более одного запроса одновременно.
Вот маршрут Express, обрабатывающий запросы HTTP:
app.get('/espPassports', (req, res) => {
mqttHelper.getESPPassports(req.query.model_Name).then((passports) => {
res.send(passports).end();
}).catch(err => {
res.send(err).end();
})
})
Вот как работает getESPPassports:
getESPPassports: async (model_Name) => {
return new Promise((resolve, reject) => {
// Say there is a request performed
ongoing_request.isOpen = true;
ongoing_request.model_Name = model_Name;
// Ask who is alive
con.publish(topic, "ASK");
setTimeout(() => {
// If no answer after given timeout
if (ongoing_request.passports.length == 0) {
reject({ error: "No MQTT passports found" });
// Else send a deep clone of the answers (else it's empty)
} else {
resolve(JSON.parse(JSON.stringify(ongoing_request.passports)));
}
// Delete the current request object and 'close it'
ongoing_request.passports.length = 0;
ongoing_request.isOpen = false;
ongoing_request.model_Name = ""
}, process.env.mqtt_timeout || 2000)
})
}
};
А вот слушатель MQTT:
con.on("message", (topic, message) => {
// If a passport is received check the topic and if there is a request opened
if (_checkTopic(topic) && ongoing_request.isOpen) {
try {
ongoing_request.passports.push(JSON.parse(message));
} catch (error) {
// do stuff if error
}
}
}
})
Я знаю, что проблема связана с логическим значением, которое я использую, чтобы указать, выполняется ли запрос в данный момент, я думал о создании объектадля каждого нового запроса и идентифицируйте их по уникальному идентификатору (например, по метке времени), но у меня нет способа заставить список MQTT узнать этот уникальный идентификатор.
У меня есть какое-то другое решение, но я не уверен, что они будут работать, и я чувствую, что есть способ справиться с этим, о котором я не знаю.
Хорошего дня.