Я хочу установить sh связь между двумя микроуслугами (NodeJS), которые связаны с RabbitMQ. Предполагается, что два микросервиса имеют две отдельные базы данных. Я создал специальный обмен для двух микросервисов. Этот обмен имеет 4 очереди.
1. Запросы от А до Б
2. Ответы от Б до А
3. Запросы от Б до А
4. Ответы от А до В
Это упрощает и реализует симплексную связь между двумя микроуслугами. (Simplex - это просто) Они Когда микросервису A нужно извлечь данные из микросервиса B, он отправляет сообщение через RabbitMQ через соответствующую очередь. Я думаю об отправке сообщения в формате JSON с уникальным идентификатором для каждого запроса.
B предполагает отправку данных с этим уникальным идентификатором (чтобы дифференцировать запрос, которому он принадлежит) для запроса как ответ на соответствующую очередь.
Отправка части не является проблемой. Но в принимающей части, как я могу получить точный ответ (по его уникальному идентификатору) без получения всех доступных ответов в очереди сообщений?
Я не хочу получать все доступные сообщения (ответ) , Чтение последнего сообщения alson не будет работать из-за асин c характера NodeJS.
В настоящее время код моего потребителя (для чтения ответа) выглядит следующим образом:
const amqp = require('amqplib');
const MongoLogger = require('./mongo');
let queue = 'test';
// Set your config here...
let config = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'rumesh',
password: 'password',
locale: 'en_US',
frameMax: 0,
heartbeat: 0,
vhost: '/',
};
async function start() {
try {
const conn = await createConnection(config);
let queue = 'test';
let exchange = 'test-exchange';
let key = 'python-key';
let exchange_type = 'direct';
console.log("Connected to AMQP server.");
let channel = await conn.createChannel();
await channel.assertExchange(exchange, exchange_type, {durable: true});
await channel.assertQueue(queue, { durable: true});
startPollingForMessages(channel);
//startSendingMessages(channel);
} catch (err) {
console.error("start: Connection error:",err.message);
}
}
async function createConnection(config) {
const conn = await amqp.connect(config);
conn.on("error", function(err) {
console.error("Connection error:",err.message);
});
conn.on("close", function() {
console.error("Connection closed:", err.message);
});
return conn;
}
function startSendingMessages(channel) {
const SEND_INTERVAL = 5000;
setInterval(() => {
sendMessage(channel, queue, JSON.stringify({ timestamp: new Date().toISOString(), message: " Some message" }));
}, SEND_INTERVAL);
}
async function sendMessage(channel, queue, messageContent) {
console.log(`sendMessage: sending message: ${messageContent}...`);
return channel.sendToQueue(queue, Buffer.from(messageContent))
}
function startPollingForMessages(ch) {
ch.consume(queue, (msg) => {
onNewMessage(msg);
ch.ack(msg);
});
}
function onNewMessage(msg) {
// Do your database stuff or whatever here....
let m = msg.content.toString();
console.log("On new message:", m);
let logger = new MongoLogger();
logger.sendMessage(m).then(()=>{
//console.log("Msg sent to db")
})
}
start();