Исключение сокета node.js прочитано ETIMEDOUT - как мне правильно его перехватить?как насчет записи таймаутов? - PullRequest
7 голосов
/ 18 февраля 2012

У меня есть прокси-сервер, который управляет группой клиентов, а также общается с другим http-сервером.Сообщения отправляются туда и обратно и направляются клиентам.Клиенты могут и делают тайм-аут, и сервер имеет функцию сердцебиения (которая повторяется каждые n секунд), которая отправляет сердцебиение всем клиентам, которые находятся в карте clientId для подключения к сокету.

Я получаю 'чтение ETIMEDOUTисключение, когда сердцебиение пытается связаться с клиентом, который больше не подключен, но чей сокет все еще активен.Я попытался временно установить тайм-аут соединения с сокетом на 2000 мс с теорией, что мой обработчик событий сокета для тайм-аута поймает это (обработчик событий находится в серверной части tcp), но этого не произошло.Чтобы умереть, нужно несколько ударов сердца.

Отчасти проблема заключается в том, что я не понимаю, как структурировать код node.js, поэтому, если у вас есть какие-либо предложения, я буду очень признателен за них.

Другой вопрос: можно ли отдельно обрабатывать таймауты чтения и записи или, по крайней мере, их устранять?Что я действительно хотел бы сделать, так это чтобы моя функция биения была частью tcp-сервера и отправляла биение только в том случае, если он не получил ответа от клиента в течение, скажем, n секунд, и отправляла это биение только один раз.Если мы получаем тайм-аут, то мы убиваем сокет, в противном случае мы ждем снова.

Спасибо!

>>$ node --harmony-weakmaps server.js
Heartbeat: Sat Feb 18 2012 08:34:40 GMT+0000 (UTC)
{
    sending keep_alive to id:00:00:00:00:00:10 socket:[object Object]
}
socket:received data: {"id":"00:00:00:00:00:10","m":"keep_alive","success":"true"}

Heartbeat: Sat Feb 18 2012 08:35:40 GMT+0000 (UTC)
{
    sending keep_alive to id:00:00:00:00:00:10 socket:[object Object]
}


socket:received data: {"id":"00:00:00:00:00:10","m":"keep_alive","success":"true"}
node.js:201
        throw e; // process.nextTick error, or 'error' event on first tick
              ^
Error: read ETIMEDOUT
    at errnoException (net.js:642:11)
    at TCP.onread (net.js:375:20)

Функция сердцебиения, которая запускает тайм-аут:

console.log("Starting heartbeat");
var beat_period = 60;
setInterval(function() {
    if(Object.keys(id2socket).length != 0){
        console.log("Heartbeat: " + new Date());
        //for (var key in id2socket) {
        //  console.log("\t"+key+"->"+id2socket[key]);
        //}
        console.log("{");
        for(var id in id2socket) {
            var socket = id2socket[id];
            // don't want sockets to time out
            socket.setTimeout(2000); // for heartbeat, set the timeout
            try {
                console.log("\tsending keep_alive to id:"+id+" socket:"+id2socket[id]);
                socket.write('{"m":"keep_alive"}\r\n');
            } catch(Error) {
                console.log("Heartbeat:Cannot find id:"+id);
                removeSocketFromMap(id,socket);
                // TODO: send message to API
            }
            socket.setTimeout(0); // no timeout
        }
        console.log("}");
    }
}, beat_period * 1000);

server.js:

// Launch Instructions
// node --harmony-weakmaps server.js

var net = require('net'); // tcp-server
var http = require("http"); // http-server
var querystring = require('querystring');

// Map of sockets to clients
var id2socket = new Object;
var socket2id = new WeakMap; // allows us to use object as key to hash

// Test for client:
// {"id":"123","m":"add"} // establishes connection and puts client into id2socket map
// {"id":"123","m":"test"} // sends a message through to API

// HTTP:POST outbound function
// http://www.theroamingcoder.com/node/111
function postOut(dataToPost){
    try{
        console.log("postOut msg:"+JSON.stringify(dataToPost));
    } catch (Error) {
        console.log("postOut error:"+Error);
    }

    var post_domain = '127.0.0.1';
    var post_port = 80;
    var post_path = '/cgi-bin/index3.py'; 

    var post_data = querystring.stringify({
            'act' : 'testjson',
            'json' : JSON.stringify(dataToPost)
    });
    console.log("post_data:"+post_data);

    var post_options = {
      host: post_domain,
      port: post_port,
      path: post_path,
      method: 'POST',
      headers: {
        'Content-Type': 'application/x-www-form-urlencoded',
        'Content-Length': post_data.length
      }
    };
    var post_req = http.request(post_options, function(res) {
      res.setEncoding('utf8');
      res.on('data', function (chunk) {
        console.log('Response:data: ' + chunk);
      });
    });

    // Handle various issues
    post_req.on('error', function(error) {
        console.log('ERROR' + error.message);
        // If you need to go on even if there is an error add below line
        //getSomething(i + 1);
    });
    post_req.on("response", function (response) {
        console.log("Response:response:"+response);
    });

    // write parameters to post body
    post_req.write(post_data);
    post_req.end();
}

function removeSocketFromMap(id,socket){
    console.log("removeSocketFromMap socket:"+socket+" id:"+id);
    delete id2socket[id];
    socket2id.delete(socket);
    //TODO: print map???
    console.log("socketmap {");
    for (var key in id2socket) {
        console.log("\t"+key+"->"+id2socket[key]);
    }
    console.log("}");
}

// Setup a tcp server
var server_plug = net.createServer(

    function(socket) {

        // Event handlers
        socket.addListener("connect", function(conn) {
            console.log("socket:connection from: " + socket.remoteAddress + ":" + socket.remotePort + " id:"+socket.id );   
        });

        socket.addListener("data", function(data) {
            console.log("socket:received data: " + data);
            var request = null;
            try {
                request = JSON.parse(data);
            } catch (SyntaxError) {
                console.log('Invalid JSON:' + data);
                socket.write('{"success":"false","response":"invalid JSON"}\r\n');
            }

            if(request!=null){
                response = request; // set up the response we send back to the client

                if(request.m=="keep_alive"){ // HACK for keep alive
                    // Do nothing
                } else if(request.m !== undefined && request['id'] !== undefined){ // hack on 'id', id is js obj property
                    if(request.m == 'connect_device' || request.m == 'add'){
                        console.log("associating uid " + request['id'] + " with socket " + socket);
                        id2socket[request['id']] = socket;
                        socket2id.set(socket, request['id']);
                    }
                    postOut(request);
                    socket.write(JSON.stringify(response)+"\r\n");
                } else if(request['id'] !== undefined){
                    postOut(request);
                    socket.write(JSON.stringify(response)+"\r\n");
                } else {
                    response['content'] = "JSON doesn't contain m or id params";
                    socket.write(JSON.stringify(response)+"\r\n");
                }
            } else {
                console.log("null request");
            }

        });

        socket.on('end', function() {
            id = socket2id.get(socket);

            console.log("socket:disconnect by id " + id);
            removeSocketFromMap(id,socket);
            socket.destroy();
        });

        socket.on('timeout', function() {
            id = socket2id.get(socket);

            console.log('socket:timeout by id ' + id);
            removeSocketFromMap(id,socket);
            socket.destroy();
        });

        // handle uncaught exceptions
        socket.on('uncaughtException', function(err) {
            id = socket2id.get(socket);

            console.log('socket:uncaughtException by id ' + id);
            removeSocketFromMap(id,socket);
            socket.destroy();
        });

    }
);
server_plug.on('error', function (error) {
    console.log('server_plug:Error: ' + error);
});

// Setup http server
var server_http = http.createServer(
    // Function to handle http:post requests, need two parts to it
    // http://jnjnjn.com/113/node-js-for-noobs-grabbing-post-content/
    function onRequest(request, response) {
        request.setEncoding("utf8");
        request.content = '';

        request.on('error', function(err){
            console.log("server_http:error: "+err);
        })

        request.addListener("data", function(chunk) {
            request.content += chunk;
        });

        request.addListener("end", function() {
            console.log("server_http:request_received");

            try {
                var json = querystring.parse(request.content);

                console.log("server_http:received_post {");
                for(var foo in json){
                    console.log("\t"+foo+"->"+json[foo]);
                }
                console.log("}");

                // Send json message content to socket
                if(json['json']!=null && json['id']!=null){
                    id = json['id'];
                    try {
                        var socket = id2socket[id];
                        socket.write(json['json']+"\r\n");
                    } catch (Error) {
                        console.log("Cannot find socket with id "+id);
                    } finally {
                        // respond to the incoming http request
                        response.end();
                        // TODO: This should really be in socket.read!
                    }
                }
            } catch(Error) {
                console.log("JSON parse error: "+Error)
            }
        });

        request.on('end', function () {
            console.log("http_request:end");
        });

        request.on('close', function () {
            console.log("http_request:close");
        });
    }
);
server_http.on('error', function (error) {
    console.log('server_http:Error: ' + error);
});

// Heartbeat function
console.log("Starting heartbeat");
var beat_period = 60;
setInterval(function() {
    if(Object.keys(id2socket).length != 0){
        console.log("Heartbeat: " + new Date());
        //for (var key in id2socket) {
        //  console.log("\t"+key+"->"+id2socket[key]);
        //}
        console.log("{");
        for(var id in id2socket) {
            var socket = id2socket[id];
            // don't want sockets to time out
            socket.setTimeout(2000); // for heartbeat, set the timeout
            try {
                console.log("\tsending keep_alive to id:"+id+" socket:"+id2socket[id]);
                socket.write('{"m":"keep_alive"}\r\n');
            } catch(Error) {
                console.log("Heartbeat:Cannot find id:"+id);
                removeSocketFromMap(id,socket);
                // TODO: send message to API
            }
            socket.setTimeout(0); // no timeout
        }
        console.log("}");
    }
}, beat_period * 1000);



// Fire up the servers
//var HOST = '127.0.0.1'; // just local incoming connections
var HOST = '0.0.0.0'; // allows access to all external IPs
var PORT = 5280;
var PORT2 = 9001;

// accept tcp-ip connections
server_plug.listen(PORT, HOST);
console.log("TCP server listening on "+HOST+":"+PORT);

// accept posts
server_http.listen(PORT2);
console.log("HTTP server listening on "+HOST+":"+PORT2);

РЕДАКТИРОВАТЬ:

Должен ли я использовать .on (событие, обратный вызов) против .onlistener (событие, обратный вызов)?

ОБНОВЛЕНИЕ:

Это не сработало, я изменил материал в tcp_server на все add_listener в сердцебиении как .on.Все еще не уловил ошибок и взорвался и сказал, что я добавил слишком много слушателей.

Ответы [ 2 ]

5 голосов
/ 25 февраля 2012

Во-первых, немного сложно сказать, правильна ли ваша структура, без некоторого понимания контекста вашего кода ...

Попробуйте добавить

socket.on('error', function() {
    id = socket2id.get(socket);

    console.log('socket:timeout by id ' + id);
    removeSocketFromMap(id,socket);
    socket.destroy();
}

анонимной функции в net.CreateServer. ETIMEDOUT - ошибка системного вызова, и node.js просто сообщает о ней. Это не может быть вызвано просто типичным «тайм-аутом». Вы говорите, что это вызвано записью Hearbeat, но похоже, что TCP.read является источником. Это может быть полузакрытая розетка.

1 голос
/ 25 февраля 2012

При проблеме исключения ETIMEDOUT пытались ли вы прослушать исключение uncaughtException для самого процесса?

process.on('uncaughtException', function (err) {
  console.log('Caught exception: ' + err);
});

См. Документацию здесь: http://nodejs.org/docs/latest/api/process.html#event_uncaughtException_

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...