Увеличение скорости max_frame в amqp.js - превышение пределов в буферной копии - PullRequest
1 голос
/ 08 февраля 2012

У меня есть ситуация, когда у меня около 50 слушателей на 50 «прямых» обменах.Клиент и сервер находятся в javascript (node.js).Он использует node-amqp из postwait.Вещи работают довольно хорошо при низкой частоте сообщений.Как только частота сообщений увеличивается на ~ 5000 сообщений в минуту, в amqp.js отображается ошибка копирования буфера. Из того, что я мог проследить, max_frame_size в amqp.js зафиксировано в 131072 .

Я только что попытался удвоить значение со 128k до 256k.Но это приводит к автоматическому сбою node.js без запуска.Там нет сообщения об ошибке.Я предполагаю, что мне также нужно изменить соответствующее значение (max_frame) в файле rabbit.config.

Нужно ли мне делать что-то еще, чтобы увеличить это значение.Любые другие предложения также будут оценены.

Я приложил минимальный код для симуляции ошибки.Запустите приведенные ниже команды в 2 окнах, чтобы смоделировать ошибку

node engine-so-client.js -c 200 -p 12000

узел server-so.js

Файл server-so.js

var util= require('util')
var amqp = require('amqp');
var express = require ('express')


function httpServer(exchange) {
   console.log("In httpServer start  %s",exchange.name);
   var port;
   app = express.createServer();

   app.get('/message/:routingKey/:message',function(req,res) {
    exchange.publish(req.params.routingKey,{'d' : req.params.message});
    res.send('Published the message '+req.params.message+'\n');
});

app.get('/register/:socket/:routingKey',function(req,res) {
    var queue1 = conn.queue('',
            {autoDelete: false, durable: true, exclusive: true},
            function() {
                    console.log("Queue1 Callback");
                    queue1.subscribe(
                    function(message) {
                            console.log("subscribe Callback for "+req.params.routingKey + " " + message.d);
                    });
                    console.log("Queue Callback Binding with "+req.params.routingKey);
                    queue1.bind(exchange.name,req.params.routingKey);
            });
    res.send('Started the socket at '+req.params.socket+'\n');
   });

   app.listen(3000);
   app.use(express.logger());
   console.log('Started server on port %s', app.address().port);
   }

   function setup() {
      console.log("Setup");
      var exchange = conn.exchange('cf2-demo',
    {'type': 'direct', durable: false}, function() {
    var queue = conn.queue('',
            {autoDelete: false, durable: true, exclusive: true},
            function() {
                    console.log("Queue Callback Binding with test key");
                    queue.bind(exchange.name,'testKey');
            });
                    queue.on('queueBindOk',
                            function() { httpServer(exchange); });
   });

   console.log("Completed setup %s", exchange.name);
   }
   var conn = amqp.createConnection({host:'localhost',
                                    login:'guest',
                                    password:'guest'},
                             {defaultExchangeName: "cf2-demo"});
   conn.on('ready',setup);

Файл engine-so-client.js

var program = require('commander.js');
var util = require('util');
var http = require('http');

program
    .version('0.0.1')
    .option('-h, --host <host>', 'Host running server', String,'localhost')
    .option('-p, --port <port>', 'Port to open to connect messages on',Number,12000)
    .option('-k, --key <key>,',  'Routing Key to be used',String,'key1')
    .option('-c, --count <count>','Iteration count',Number,50)
    .option('-m, --mesg <mesg>','Message prefix',String,'hello')
    .option('-t, --timeout', 'Timeout in ms between message posts')
    .parse(process.argv);

function setup(host,port,key,mesg) {
  var client = http.createClient(3000, host);
  var request = client.request('GET','/register/'+port+"/"+key);
  request.end();
  request.on('response', function(response) {
    response.on('data', function(chunk) {
                    postMessage(host,port,key,mesg,1);
            });
  });
 }

 function postMessage(host,port,key,mesg,count) {
    var timeNow = new Date().getTime();
    var mesgNow = mesg+"-"+count+"-"+port;
    console.log("Type: Sent Mesg, Message: %s, Time: %s",mesgNow,timeNow);
    var client1 = http.createClient(3000, host);
    var request1 = client1.request('GET','/message/'+key+"/"+mesgNow);
    request1.end();
    count++;
    if (count <100) {
       setTimeout( function() { postMessage(host,port,key,mesg,count); }, 1000 );
    }
  }
  var port = program.port;
  var host = program.host;
  var key = program.key;
  var mesg = program.mesg;
  var count = program.count;
  var keys = ['key1','key2','key3','key4','key5'];
  var messages = ['hello','world','good','morning','bye'];
  var start=port;
  for (i=0; i<count; i++) {
      var index = i%keys.length;
      var socket = start + i;
      setup(host,socket,keys[index],messages[index]);
  }

Ошибка прилагается

buffer.js:494
throw new Error('sourceEnd out of bounds');
      ^
Error: sourceEnd out of bounds
at Buffer.copy (buffer.js:494:11)
at frame (/home/hvram/programs/node_modules/node-amqp/amqp.js:170:10)
at header (/home/hvram/programs/node_modules/node-amqp/amqp.js:160:14)
at frameEnd (/home/hvram/programs/node_modules/node-amqp/amqp.js:205:16)
at frame (/home/hvram/programs/node_modules/node-amqp/amqp.js:172:14)
at header (/home/hvram/programs/node_modules/node-amqp/amqp.js:160:14)
at frameEnd (/home/hvram/programs/node_modules/node-amqp/amqp.js:205:16)
at frame (/home/hvram/programs/node_modules/node-amqp/amqp.js:172:14)
at header (/home/hvram/programs/node_modules/node-amqp/amqp.js:160:14)
at frameEnd (/home/hvram/programs/node_modules/node-amqp/amqp.js:205:16)
...