RabbitMQ. публикация сообщения в цикле - PullRequest
1 голос
/ 10 января 2020

Я использую amqplib js lib. У меня есть издатель (он публикует sh сообщений в l oop) и несколько работников. Если я публикую sh 1000000 сообщений, я вижу, как мои сообщения сначала отправляются кролику, после этого я получаю ack , и только после того, как это сообщение начинает потребляться рабочим / работникам.

Как я понимаю, при отправке сообщений кролик не может отправить ack издателю. Я прав? Как я могу решить это?

У меня есть основной файл:

let amqp = require('amqplib');

const EXCHANGE = 'simple_exchange',
  EXCHANGE_TYPE = 'direct',
  QUEUE = 'simple_queue',
  ROUTING_KEY = 'simple_routing_key';

const defaultPublishCount = 10000;

let runInit = async() => {
  let connection = await amqp.connect('amqp://localhost');
  let channel = await connection.createChannel();

  let commonOptions = {
    durable: false
  };

  await channel.assertExchange(EXCHANGE, EXCHANGE_TYPE, commonOptions);
  await channel.assertQueue(QUEUE, commonOptions);
  await channel.bindQueue(QUEUE, EXCHANGE, ROUTING_KEY, commonOptions);

  return channel;
};

let runPublisher = async(count, userChannel) => {
  const channel = userChannel || await runInit();

  let d1 = (new Date).toISOString().slice(11, 23).replace('T', ' ');
  const publishCount = count || defaultPublishCount;

  let index = 1;
  while (index <= publishCount) {
    let msg = {
      id: index,
      time: (new Date).toISOString().slice(11, 23).replace('T', ' ')
    };
    channel.publish(EXCHANGE, ROUTING_KEY, new Buffer(JSON.stringify(msg)), {
      noAck: true
    });
    console.log(`Message sent: ${JSON.stringify(msg)}`);
    index++;
  }
  let d2 = (new Date).toISOString().slice(11, 23).replace('T', ' ');


  console.log(`\nPublish started  at: ${d1}`);
  console.log(`Publish finished at: ${d2}\n\n`);
};

let runWorker = async(userChannel) => {
  const channel = userChannel || await runInit();

  console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", QUEUE);
  channel.consume(QUEUE, (msg) => {
    console.log(` [x] Received: '${msg.content}' at ${(new Date).toISOString().slice(11, 23).replace('T', ' ')}`);
    channel.ack(msg);
  });
};

module.exports = {
  runInit,
  runPublisher,
  runWorker
};

простой издатель:

let { runPublisher } = require('./amqp_core.js');

let count = (process.argv.slice(2, 3)[0]) * 1;
runPublisher(count);

и простой рабочий:

let { runWorker } = require('./amqp_core.js');

runWorker();

результат работы: 10 000 messages1 000 000 messages Не знаю, важно это или нет, но я сказать. Я использую этот кластер кроликов , и в нем я включил одну политику:

и подумал, что какая-то проблема в канале и я добавил еще одну тест:

(async() => {
  let core = require('./amqp_core.js');

  let channel = await core.runInit();

  await core.runWorker(channel);

  core.runPublisher(25, channel);
})();

но результат был тот же: img2

1 Ответ

0 голосов
/ 14 января 2020

Я не знаю, прав я или нет, но я решил эту проблему)

enter image description here ранее я использовал одно соединение и один канал для потребителя и для издателя. но нет, я открываю новый канал для каждого нового сообщения sh.

let channel = await connection.createChannel();
await channel.publish(EXCHANGE, ROUTING_KEY, new Buffer(JSON.stringify(msg)), {noAck: true});
await channel.close();

, как я видел в Rabbit Management, на вкладке каналов был только один канал, даже когда я открыл пять издателей и одного работника.

полный код:

let amqp = require('amqplib');

const EXCHANGE = 'simple_exchange',
  EXCHANGE_TYPE = 'direct',
  QUEUE = 'simple_queue',
  ROUTING_KEY = 'simple_routing_key';

const defaultPublishCount = 10000;

let runInit = async() => {
  let connection = await amqp.connect('amqp://localhost');
  let channel = await connection.createChannel();

  let commonOptions = {
    durable: false
  };

  await channel.assertExchange(EXCHANGE, EXCHANGE_TYPE, commonOptions);
  await channel.assertQueue(QUEUE, commonOptions);
  await channel.bindQueue(QUEUE, EXCHANGE, ROUTING_KEY, commonOptions);
  await channel.close();

  return connection;
};

let runPublisher = async(count) => {
  const connection = await runInit();

  let d1 = (new Date).toISOString().slice(11, 23).replace('T', ' ');
  const publishCount = count || defaultPublishCount;

  let index = 1;
  while (index <= publishCount) {
    let msg = {
      id: index,
      time: (new Date).toISOString().slice(11, 23).replace('T', ' ')
    };
    let channel = await connection.createChannel();
    await channel.publish(EXCHANGE, ROUTING_KEY, new Buffer(JSON.stringify(msg)), {
      noAck: true
    });
    await channel.close();
    console.log(`Message sent: ${JSON.stringify(msg)}`);
    index++;
  }
  let d2 = (new Date).toISOString().slice(11, 23).replace('T', ' ');


  console.log(`\nPublish started  at: ${d1}`);
  console.log(`Publish finished at: ${d2}\n\n`);
};

let runWorker = async() => {
  const connection = await runInit();
  let channel = await connection.createChannel();

  console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", QUEUE);
  channel.consume(QUEUE, (msg) => {
    console.log(` [x] Received: '${msg.content}' at ${(new Date).toISOString().slice(11, 23).replace('T', ' ')}`);
    channel.ack(msg);
  });
};

module.exports = {
  runInit,
  runPublisher,
  runWorker
};

и теперь я хочу услышать ваше мнение по этому поводу

...