Как узнать, когда указанное сообщение c поступило в очередь RabbitMQ и NodeJs - PullRequest
0 голосов
/ 10 марта 2020

Я хочу установить sh связь между двумя микроуслугами (NodeJS), которые связаны с RabbitMQ. Предполагается, что два микросервиса имеют две отдельные базы данных. Я создал специальный обмен для двух микросервисов. Этот обмен имеет 4 очереди.

1. Запросы от А до Б

2. Ответы от Б до А

3. Запросы от Б до А

4. Ответы от А до В

Это упрощает и реализует симплексную связь между двумя микроуслугами. (Simplex - это просто) Они Когда микросервису A нужно извлечь данные из микросервиса B, он отправляет сообщение через RabbitMQ через соответствующую очередь. Я думаю об отправке сообщения в формате JSON с уникальным идентификатором для каждого запроса.

B предполагает отправку данных с этим уникальным идентификатором (чтобы дифференцировать запрос, которому он принадлежит) для запроса как ответ на соответствующую очередь.

Отправка части не является проблемой. Но в принимающей части, как я могу получить точный ответ (по его уникальному идентификатору) без получения всех доступных ответов в очереди сообщений?

Я не хочу получать все доступные сообщения (ответ) , Чтение последнего сообщения alson не будет работать из-за асин c характера NodeJS.

В настоящее время код моего потребителя (для чтения ответа) выглядит следующим образом:

 const amqp = require('amqplib');
const MongoLogger = require('./mongo');
let queue = 'test';
// Set your config here...
let config = {
    protocol: 'amqp',
    hostname: 'localhost',
    port: 5672,
    username: 'rumesh',
    password: 'password',
    locale: 'en_US',
    frameMax: 0,
    heartbeat: 0,
    vhost: '/',
};


async function start() {
    try {
        const conn = await createConnection(config);
        let queue = 'test';
        let exchange = 'test-exchange';
        let key = 'python-key';
        let exchange_type = 'direct';
        console.log("Connected to AMQP server.");
        let channel = await conn.createChannel();
        await channel.assertExchange(exchange, exchange_type, {durable: true});
        await channel.assertQueue(queue, { durable: true});

        startPollingForMessages(channel);
        //startSendingMessages(channel);
    } catch (err) {
        console.error("start: Connection error:",err.message);
    }
}

async function createConnection(config) {
    const conn = await amqp.connect(config);

    conn.on("error", function(err) {
        console.error("Connection error:",err.message);
    });

    conn.on("close", function() {
        console.error("Connection closed:", err.message);
    });

    return conn;
}

function startSendingMessages(channel) {
    const SEND_INTERVAL = 5000;
    setInterval(() => {
        sendMessage(channel, queue, JSON.stringify({ timestamp: new Date().toISOString(), message: " Some message" }));
    }, SEND_INTERVAL);
}

async function sendMessage(channel, queue, messageContent) {
    console.log(`sendMessage: sending message: ${messageContent}...`);
    return channel.sendToQueue(queue, Buffer.from(messageContent))
}

function startPollingForMessages(ch) {
    ch.consume(queue, (msg) => {
        onNewMessage(msg);
        ch.ack(msg);
    });
}

function onNewMessage(msg) {
    // Do your database stuff or whatever here....
    let m = msg.content.toString();
    console.log("On new message:", m);
    let logger = new MongoLogger();
    logger.sendMessage(m).then(()=>{
        //console.log("Msg sent to db")
    })
}

start();
...