Node.js входящий буфер TCP сервера - PullRequest
7 голосов
/ 10 мая 2011

У меня есть два процесса узла, которые говорят друг с другом.Я назову их [Узловой сервер] и [Узловый отправитель] . [Node Sender] постоянно обрабатывает информацию и записывает сообщение по TCP-соединению на [Node Server] . [Node Server] затем записывает обратно сообщение о состоянии.

Пример [Node Sender]:

var message = "Test Message";
[Node Sender].connection.write(message);

Пример [Node Server]:

[Node Server].socket.on("data", function(p_data) {
    this.write("OK");

    // Do some work with p_data
}

Это работает без проблем, p_data всегда содержит «Тестовое сообщение» при отправке со скоростью более 5 миллисекунд.Однако, если я ускоряю [Node Sender] для записи каждую миллисекунду, p_data иногда заканчивается чем-то вроде "Test MessageTest MessageTes" .

Я понимаю, что буфер в [Node Sender] , вероятно, заполняется быстрее, чем отправляет его команда записи.Есть ли способ заставить соотношение один к одному при отправке сообщений, оставаясь при этом асинхронным?

Конечно, я могу просто добавить терминатор к своему сообщению и просто заполнить буфер в [Узловой сервер] , но я хотел убедиться, что не было чего-то очевидного, что я пропустил.

Ответы [ 2 ]

12 голосов
/ 11 мая 2011

Нет, вы ничего не пропустили, и да, вам нужно добавить некоторую форму завершения в ваши сообщения.

Здесь есть две основные проблемы:

  1. Протокол TCP ориентирован на поток, а не на сообщения;он не знает, что может составлять «сообщение».

  2. Событие данных, инициируемое сетевой библиотекой node.js, указывает, что некоторые данные получены, ноне имея представления о том, что может содержать сообщение, оно не может указывать на то, что оно получило какие-либо конкретные данные.

Таким образом, отправляя сообщения быстрее, чем Node может их обработать, буфер сокета recv заполняетсяс несколькими «сообщениями».

Типичным решением этой проблемы является добавление завершения строки, как можно найти в https://github.com/baudehlo/Haraka/blob/master/connection.js в строках 32-34:

self.client.on('data', function (data) {
    self.process_data(data);
});

и строки 110-134:

Connection.prototype.process_data = function (data) {
  if (this.disconnected) {
    logger.logwarn("data after disconnect from " + this.remote_ip);
    return;
  }

  this.current_data += data;
  this._process_data();
};

Connection.prototype._process_data = function() {
  var results;
  while (results = line_regexp.exec(this.current_data)) {
    var this_line = results[1];
    if (this.state === 'pause') {
        this.early_talker = 1;
        var self = this;
        // If you talk early, we're going to give you a delay
        setTimeout(function() { self._process_data() }, this.early_talker_delay);
        break;
    }
    this.current_data = this.current_data.slice(this_line.length);
    this.process_line(this_line);
  }
};
9 голосов
/ 27 декабря 2012

Вам необходимо накопить входящие данные буфера, чтобы получить ваше полное сообщение. пожалуйста, обратитесь к приведенному ниже примеру. этот сервер ожидает данные с 4-х байтовым заголовком и телом сообщения. заголовок - беззнаковое целое, что означает общую длину тела, а тело - строковые данные с разделителем '|' обратите внимание, что возможно, что этот «заголовок и сообщение» не были получены одновременно. поэтому нам нужно накапливать входящие данные, пока мы не получим полную длину данных. И также возможно, что несколько «заголовок и сообщение» получаются одновременно. Дело в том, что нам нужно накопить данные.

var SERVER_PORT = 8124;
var TCP_DELIMITER = '|';
var packetHeaderLen = 4; // 32 bit integer --> 4

var server = net.createServer( function(c) {
    var accumulatingBuffer = new Buffer(0); 
    var totalPacketLen   = -1; 
    var accumulatingLen  =  0;
    var recvedThisTimeLen=  0;
    var remoteAddress = c.remoteAddress;
    var address= c.address();
    var remotePort= c.remotePort;
    var remoteIpPort = remoteAddress +":"+ remotePort;

    console.log('-------------------------------'+remoteAddress);
    console.log('remoteIpPort='+ remoteIpPort); 

    c.on('data', function(data) {
        console.log('received data length :' + data.length ); 
        console.log('data='+ data); 

        recvedThisTimeLen = data.length;
        console.log('recvedThisTimeLen='+ recvedThisTimeLen);

        //accumulate incoming data
        var tmpBuffer = new Buffer( accumulatingLen + recvedThisTimeLen );
        accumulatingBuffer.copy(tmpBuffer);
        data.copy ( tmpBuffer, accumulatingLen  ); // offset for accumulating
        accumulatingBuffer = tmpBuffer; 
        tmpBuffer = null;
        accumulatingLen += recvedThisTimeLen ;
        console.log('accumulatingBuffer = ' + accumulatingBuffer  ); 
        console.log('accumulatingLen    =' + accumulatingLen );

        if( recvedThisTimeLen < packetHeaderLen ) {
            console.log('need to get more data(less than header-length received) -> wait..');
            return;
        } else if( recvedThisTimeLen == packetHeaderLen ) {
            console.log('need to get more data(only header-info is available) -> wait..');
            return;
        } else {
            console.log('before-totalPacketLen=' + totalPacketLen ); 
            //a packet info is available..
            if( totalPacketLen < 0 ) {
                totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; 
                console.log('totalPacketLen=' + totalPacketLen );
            }
        }    

        //while=> 
        //in case of the accumulatingBuffer has multiple 'header and message'.
        while( accumulatingLen >= totalPacketLen + packetHeaderLen ) {
            console.log( 'accumulatingBuffer= ' + accumulatingBuffer );

            var aPacketBufExceptHeader = new Buffer( totalPacketLen  ); // a whole packet is available...
            console.log( 'aPacketBufExceptHeader len= ' + aPacketBufExceptHeader.length );
            accumulatingBuffer.copy( aPacketBufExceptHeader, 0, packetHeaderLen, accumulatingBuffer.length); // 

            ////////////////////////////////////////////////////////////////////
            //process one packet data
            var stringData = aPacketBufExceptHeader.toString();
            var usage = stringData.substring(0,stringData.indexOf(TCP_DELIMITER));
            console.log("usage: " + usage);
            //call handler
            (serverFunctions [usage])(c, remoteIpPort, stringData.substring(1+stringData.indexOf(TCP_DELIMITER)));
            ////////////////////////////////////////////////////////////////////

            //rebuild buffer
            var newBufRebuild = new Buffer( accumulatingBuffer.length );
            newBufRebuild.fill();
            accumulatingBuffer.copy( newBufRebuild, 0, totalPacketLen + packetHeaderLen, accumulatingBuffer.length  );

            //init      
            accumulatingLen -= (totalPacketLen +4) ;
            accumulatingBuffer = newBufRebuild;
            newBufRebuild = null;
            totalPacketLen = -1;
            console.log( 'Init: accumulatingBuffer= ' + accumulatingBuffer );   
            console.log( '      accumulatingLen   = ' + accumulatingLen );  

            if( accumulatingLen <= packetHeaderLen ) {
                return;
            } else {
                totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; 
                console.log('totalPacketLen=' + totalPacketLen );
            }    
        }  
    }); 

    ...
});

См. Пример ниже.

https://github.com/jeremyko/nodeChatServer

Надеюсь, что это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...