Могу ли я дождаться завершения процесса при использовании сообщений RabbitMQ с Node.js? - PullRequest
1 голос
/ 09 марта 2020

Я довольно новичок в Node.js и ES6, и это меня немного смущает. Я пытаюсь оставить процесс запущенным, потребляя сообщения из очереди RabbitMQ. Он должен быть в состоянии обработать сообщение (что занимает около 30-60 секунд), прежде чем оно захватит следующее сообщение. В настоящее время код, который у меня есть, захватывает все возможные сообщения и затем пытается обработать процессы. Когда в очереди 3-5 сообщений, это нормально, но для 20, 50 или 100 сообщений это приводит к нехватке памяти на сервере.

Я попытался сделать функцию обратного вызова .consume() asyn c и добавление await к функции обработки сообщений. Я попытался обернуть await new Promise в .consume() callback вокруг processMessage. Я попытался добавить await в строку, которая вызывает channel.consume. Ничто не меняет поведение.

#!/usr/bin/env node

const amqp = require('amqplib');

const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
    const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
    const cluster = await amqp.connect(conn_str);
    const channel = await cluster.createChannel();
    await channel.assertQueue(queue,  { durable: durable, autoDelete: true });
    if (prefetch) {
        channel.prefetch(prefetch);
    }
    console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)

    try {
        channel.consume(queue, message => {
            if (message !== null) {
                console.log(' [x] Received', message.content.toString());
                processMessage(message.content.toString());
                channel.ack(message);
                return null;
            } else {
                console.log(error, 'Queue is empty!')
                channel.reject(message);
            }
        }, {noAck: isNoAck});
    } catch (error) {
        console.log(error, 'Failed to consume messages from Queue!')
        cluster.close(); 
    }
}

exports.consumeFromQueue = consumeFromQueue;

В качестве идентификатора, если я создаю массив строк и l oop через строки, когда я добавляю await в строку processMessage, он ждет выполнения процесса (30-60 секунд) перед обработкой следующей строки.

(async () => {
    for (let i=0; i<urls.length; i++) {
        await processMessage(urls[i]);
    }
})();

Так что мне в основном нужно что-то вроде этого, но с прослушиванием очереди в RabbitMQ.

Ответы [ 2 ]

1 голос
/ 09 марта 2020

Если вы хотите ограничить количество сообщений, обрабатываемых потребителем в любой момент времени, используйте channel.prefetch () :

Указанное количество является максимальным количество сообщений, отправленных по каналу, которые могут ожидать подтверждения; как только количество сообщений не будет обработано, сервер не будет отправлять больше сообщений по этому каналу, пока одно или несколько не будут подтверждены.

То есть, если вы хотите иметь возможность обрабатывать только одно сообщение на время до перехода к следующему, установите channel.prefetch(1)

0 голосов
/ 09 марта 2020

переходя к следующему, установите channel.prefetch (1)

channel.prefetch(1)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...