У меня есть ситуация, когда у меня около 50 слушателей на 50 «прямых» обменах.Клиент и сервер находятся в javascript (node.js).Он использует node-amqp из postwait.Вещи работают довольно хорошо при низкой частоте сообщений.Как только частота сообщений увеличивается на ~ 5000 сообщений в минуту, в amqp.js отображается ошибка копирования буфера. Из того, что я мог проследить, max_frame_size в amqp.js зафиксировано в 131072 .
Я только что попытался удвоить значение со 128k до 256k.Но это приводит к автоматическому сбою node.js без запуска.Там нет сообщения об ошибке.Я предполагаю, что мне также нужно изменить соответствующее значение (max_frame) в файле rabbit.config.
Нужно ли мне делать что-то еще, чтобы увеличить это значение.Любые другие предложения также будут оценены.
Я приложил минимальный код для симуляции ошибки.Запустите приведенные ниже команды в 2 окнах, чтобы смоделировать ошибку
node engine-so-client.js -c 200 -p 12000
узел server-so.js
Файл server-so.js
var util= require('util')
var amqp = require('amqp');
var express = require ('express')
function httpServer(exchange) {
console.log("In httpServer start %s",exchange.name);
var port;
app = express.createServer();
app.get('/message/:routingKey/:message',function(req,res) {
exchange.publish(req.params.routingKey,{'d' : req.params.message});
res.send('Published the message '+req.params.message+'\n');
});
app.get('/register/:socket/:routingKey',function(req,res) {
var queue1 = conn.queue('',
{autoDelete: false, durable: true, exclusive: true},
function() {
console.log("Queue1 Callback");
queue1.subscribe(
function(message) {
console.log("subscribe Callback for "+req.params.routingKey + " " + message.d);
});
console.log("Queue Callback Binding with "+req.params.routingKey);
queue1.bind(exchange.name,req.params.routingKey);
});
res.send('Started the socket at '+req.params.socket+'\n');
});
app.listen(3000);
app.use(express.logger());
console.log('Started server on port %s', app.address().port);
}
function setup() {
console.log("Setup");
var exchange = conn.exchange('cf2-demo',
{'type': 'direct', durable: false}, function() {
var queue = conn.queue('',
{autoDelete: false, durable: true, exclusive: true},
function() {
console.log("Queue Callback Binding with test key");
queue.bind(exchange.name,'testKey');
});
queue.on('queueBindOk',
function() { httpServer(exchange); });
});
console.log("Completed setup %s", exchange.name);
}
var conn = amqp.createConnection({host:'localhost',
login:'guest',
password:'guest'},
{defaultExchangeName: "cf2-demo"});
conn.on('ready',setup);
Файл engine-so-client.js
var program = require('commander.js');
var util = require('util');
var http = require('http');
program
.version('0.0.1')
.option('-h, --host <host>', 'Host running server', String,'localhost')
.option('-p, --port <port>', 'Port to open to connect messages on',Number,12000)
.option('-k, --key <key>,', 'Routing Key to be used',String,'key1')
.option('-c, --count <count>','Iteration count',Number,50)
.option('-m, --mesg <mesg>','Message prefix',String,'hello')
.option('-t, --timeout', 'Timeout in ms between message posts')
.parse(process.argv);
function setup(host,port,key,mesg) {
var client = http.createClient(3000, host);
var request = client.request('GET','/register/'+port+"/"+key);
request.end();
request.on('response', function(response) {
response.on('data', function(chunk) {
postMessage(host,port,key,mesg,1);
});
});
}
function postMessage(host,port,key,mesg,count) {
var timeNow = new Date().getTime();
var mesgNow = mesg+"-"+count+"-"+port;
console.log("Type: Sent Mesg, Message: %s, Time: %s",mesgNow,timeNow);
var client1 = http.createClient(3000, host);
var request1 = client1.request('GET','/message/'+key+"/"+mesgNow);
request1.end();
count++;
if (count <100) {
setTimeout( function() { postMessage(host,port,key,mesg,count); }, 1000 );
}
}
var port = program.port;
var host = program.host;
var key = program.key;
var mesg = program.mesg;
var count = program.count;
var keys = ['key1','key2','key3','key4','key5'];
var messages = ['hello','world','good','morning','bye'];
var start=port;
for (i=0; i<count; i++) {
var index = i%keys.length;
var socket = start + i;
setup(host,socket,keys[index],messages[index]);
}
Ошибка прилагается
buffer.js:494
throw new Error('sourceEnd out of bounds');
^
Error: sourceEnd out of bounds
at Buffer.copy (buffer.js:494:11)
at frame (/home/hvram/programs/node_modules/node-amqp/amqp.js:170:10)
at header (/home/hvram/programs/node_modules/node-amqp/amqp.js:160:14)
at frameEnd (/home/hvram/programs/node_modules/node-amqp/amqp.js:205:16)
at frame (/home/hvram/programs/node_modules/node-amqp/amqp.js:172:14)
at header (/home/hvram/programs/node_modules/node-amqp/amqp.js:160:14)
at frameEnd (/home/hvram/programs/node_modules/node-amqp/amqp.js:205:16)
at frame (/home/hvram/programs/node_modules/node-amqp/amqp.js:172:14)
at header (/home/hvram/programs/node_modules/node-amqp/amqp.js:160:14)
at frameEnd (/home/hvram/programs/node_modules/node-amqp/amqp.js:205:16)