Оптимизация потребителей RabbitMQ для потребления в пакетном режиме - PullRequest
0 голосов
/ 16 декабря 2018

У меня есть приложение, в котором при каждом потреблении сообщения мне нужно запрашивать базу данных MySQL для получения некоторой информации и на основе этого процесса использовать полученное сообщение.Я хотел бы оптимизировать это так, чтобы несколько запросов к базе данных не добавлялись к нагрузке.

Я думал о подходе, где я жду по крайней мере x сообщений или у секунд .Таким образом, я могу пакетно потреблять некоторые сообщения, и даже если в какой-то момент я получаю меньше сообщений, они также потребляются.

Пример: Допустим, x = 100 , y =10 секунд

Это означает, что я жду не менее 100 сообщений или 10 секунд, в зависимости от того, что произойдет раньше.Таким образом, я могу запросить базу данных сразу за 100 сообщений в одном запросе.Кроме того, если я получу менее 100 сообщений, остальные сообщения будут обрабатываться в течение максимум 10 секунд.

Я использую NodeJS с amqplib для потребления.У меня есть следующий код, основанный на примерах RabbitMQ:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
    }, {noAck: true});
  });
});

Я думал о глобальном объекте и добавлял его к каждому обратному вызову consume и проверял счет этого объекта, когда он достигает х сообщений те, которые обрабатываются.Тем не менее, вы не знаете, как добавить к этому верхний лимит времени y секунд , а также убедитесь, что если я получу меньше x сообщений в пределах временного окна, они будут обработаны

1 Ответ

0 голосов
/ 17 декабря 2018

Следующий код будет вызывать функцию после каждого полученного сообщения, которая объединяет полученные сообщения в массив.Когда он вызывается без сообщения (с аргументом null) или когда он видит, что количество сообщений достигло x, он отправляет агрегированные сообщения в функцию базы данных.В противном случае он просто добавляет сообщение в массив (во второй части оператора if).

Аргумент null передается статистической функции с помощью таймера, который срабатывает через y секунд.,Этот таймер сначала устанавливается, когда очередь сообщений только что инициализирована, и сбрасывается всякий раз, когда агрегатор отправляет сообщения в базу данных.

var messageStore = [];
var timer;

sendToDatabase = function(messages) {...}

aggregate = function(msg) {
    if (msg == null || messageStore.push(msg) == x) {
        clearTimeout(timer);
        timer = setTimeout(aggregate, 1000*y, null);
        sendToDatabase(messageStore);
        messageStore = [];
    }
}

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    timer = setTimeout(aggregate, 1000*y, null);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
      aggregate(msg);
    }, {noAck: true});
  });
});

Примечание. Я не смог проверить это, так как не могупод рукой нет системы обмена сообщениями.

...