Я использую amqplib js lib. У меня есть издатель (он публикует sh сообщений в l oop) и несколько работников. Если я публикую sh 1000000 сообщений, я вижу, как мои сообщения сначала отправляются кролику, после этого я получаю ack , и только после того, как это сообщение начинает потребляться рабочим / работникам.
Как я понимаю, при отправке сообщений кролик не может отправить ack издателю. Я прав? Как я могу решить это?
У меня есть основной файл:
let amqp = require('amqplib');
const EXCHANGE = 'simple_exchange',
EXCHANGE_TYPE = 'direct',
QUEUE = 'simple_queue',
ROUTING_KEY = 'simple_routing_key';
const defaultPublishCount = 10000;
let runInit = async() => {
let connection = await amqp.connect('amqp://localhost');
let channel = await connection.createChannel();
let commonOptions = {
durable: false
};
await channel.assertExchange(EXCHANGE, EXCHANGE_TYPE, commonOptions);
await channel.assertQueue(QUEUE, commonOptions);
await channel.bindQueue(QUEUE, EXCHANGE, ROUTING_KEY, commonOptions);
return channel;
};
let runPublisher = async(count, userChannel) => {
const channel = userChannel || await runInit();
let d1 = (new Date).toISOString().slice(11, 23).replace('T', ' ');
const publishCount = count || defaultPublishCount;
let index = 1;
while (index <= publishCount) {
let msg = {
id: index,
time: (new Date).toISOString().slice(11, 23).replace('T', ' ')
};
channel.publish(EXCHANGE, ROUTING_KEY, new Buffer(JSON.stringify(msg)), {
noAck: true
});
console.log(`Message sent: ${JSON.stringify(msg)}`);
index++;
}
let d2 = (new Date).toISOString().slice(11, 23).replace('T', ' ');
console.log(`\nPublish started at: ${d1}`);
console.log(`Publish finished at: ${d2}\n\n`);
};
let runWorker = async(userChannel) => {
const channel = userChannel || await runInit();
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", QUEUE);
channel.consume(QUEUE, (msg) => {
console.log(` [x] Received: '${msg.content}' at ${(new Date).toISOString().slice(11, 23).replace('T', ' ')}`);
channel.ack(msg);
});
};
module.exports = {
runInit,
runPublisher,
runWorker
};
простой издатель:
let { runPublisher } = require('./amqp_core.js');
let count = (process.argv.slice(2, 3)[0]) * 1;
runPublisher(count);
и простой рабочий:
let { runWorker } = require('./amqp_core.js');
runWorker();
результат работы: Не знаю, важно это или нет, но я сказать. Я использую этот кластер кроликов , и в нем я включил одну политику:
и подумал, что какая-то проблема в канале и я добавил еще одну тест:
(async() => {
let core = require('./amqp_core.js');
let channel = await core.runInit();
await core.runWorker(channel);
core.runPublisher(25, channel);
})();
но результат был тот же: