Я довольно новичок в Node.js и ES6, и это меня немного смущает. Я пытаюсь оставить процесс запущенным, потребляя сообщения из очереди RabbitMQ. Он должен быть в состоянии обработать сообщение (что занимает около 30-60 секунд), прежде чем оно захватит следующее сообщение. В настоящее время код, который у меня есть, захватывает все возможные сообщения и затем пытается обработать процессы. Когда в очереди 3-5 сообщений, это нормально, но для 20, 50 или 100 сообщений это приводит к нехватке памяти на сервере.
Я попытался сделать функцию обратного вызова .consume()
asyn c и добавление await
к функции обработки сообщений. Я попытался обернуть await new Promise
в .consume()
callback вокруг processMessage
. Я попытался добавить await
в строку, которая вызывает channel.consume
. Ничто не меняет поведение.
#!/usr/bin/env node
const amqp = require('amqplib');
const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
const cluster = await amqp.connect(conn_str);
const channel = await cluster.createChannel();
await channel.assertQueue(queue, { durable: durable, autoDelete: true });
if (prefetch) {
channel.prefetch(prefetch);
}
console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)
try {
channel.consume(queue, message => {
if (message !== null) {
console.log(' [x] Received', message.content.toString());
processMessage(message.content.toString());
channel.ack(message);
return null;
} else {
console.log(error, 'Queue is empty!')
channel.reject(message);
}
}, {noAck: isNoAck});
} catch (error) {
console.log(error, 'Failed to consume messages from Queue!')
cluster.close();
}
}
exports.consumeFromQueue = consumeFromQueue;
В качестве идентификатора, если я создаю массив строк и l oop через строки, когда я добавляю await в строку processMessage
, он ждет выполнения процесса (30-60 секунд) перед обработкой следующей строки.
(async () => {
for (let i=0; i<urls.length; i++) {
await processMessage(urls[i]);
}
})();
Так что мне в основном нужно что-то вроде этого, но с прослушиванием очереди в RabbitMQ.