Node.js + socket.io + node-amqp и очереди связываются при «повторном» подключении через socket.io - PullRequest
3 голосов
/ 29 сентября 2011

У меня есть один сценарий, который очень близок к этому образцу:

Один главный экран:

  • этот экран (на стороне клиента) подключится к серверу socket.io через сервер: 9090 / scope (io.connect ("http://server:9090/scope))) и отправит одно событие" userBindOk "(socket.emit (" userBindOk ", сообщение)) серверу socket.io;

  • сервер получает соединение и «userBindOk». В этот момент сервер должен получить активное соединение с сервером rabbitmq и привязать очередь к соответствующему пользователю, который только что подключился к приложению через socket.io. образец:

    socket.on («соединение», функция (клиент) { // идентификатор клиента 1234 // связать rabbitmq exchange, queue и: queue.subscribe (// получить обратный вызов); })

  • Пока проблем нет - я могу отправлять / получать сообщения через socket.io без проблем.

  • НО, если я обновлю страницу, все эти шаги будут выполнены снова. Как следствие, привязка к очереди произойдет, но на этот раз связана с другим сеансом клиента socket.io. Это означает, что если я отправлю сообщение в очередь, которая относится к первому сеансу socket.io (до обновления страницы), эта привязка должна (я думаю) получить сообщение и отправить его недопустимому клиенту socket.io (страница обновить = новый client.id в контексте socket.io). Я могу доказать это, потому что каждый раз, когда я обновляю страницу, мне нужно отправлять в x раз больше сообщений. Например: я подключился впервые: - так, 1 сообщение - одно обновление экрана; обновить страницу: мне нужно отправить 2 сообщения в очередь, и только «второе» сообщение будет получено из «фактического» сеанса клиента socket.io - такое поведение будет происходить столько, сколько я обновляю страницу (20 обновлений страницы, 20 сообщений для отправки в очередь и сервер socket.io «последний» клиент отправит сообщение клиенту socket.io для вывода на экран).

Решения, которые я считаю:

  • Найдите способ «отсоединить» очередь при отключении от сервера socket.io - я еще не видел эту опцию в api node-amqp (ожидание этого: D)

  • найти способ переподключения клиента socket.io, используя тот же client.id. Таким образом, я могу определить клиент, который приходит, и применить некоторую логику для кэширования сокета.

Есть идеи? Я пытался быть очень ясным ... Но, как вы знаете, не так легко разоблачить вашу проблему, пытаясь прояснить что-то очень специфичное для некоторого контекста ...

ТКС

1 Ответ

1 голос
/ 05 ноября 2011

Я решил это следующим образом:

Я использовал для объявления очереди rabbitMq как durable = true, autoDelete = false, exclusive = false, и в моем приложении была 1 очередь / пользователь и 1 обмен (type =прямое) с помощью routing_key name = queueName, мое приложение также использовало очередь для других клиентов, таких как браузеры, такие как приложение для Android или приложение для iphone, как запасной вариант, поэтому я использую для создания 1 очереди для пользователя earch.

Решениечтобы эта проблема состояла в том, чтобы изменить мою очередь rabbitMQ и объявить обмен.Теперь я объявляю exchange / user как fanout и autoDelete = True, и у пользователя будет N очередей с durable = true, autoDelete = true, exclusive = true (№ queue = № клиентов), и все очереди привязаныto user-exchange (multicast).

ПРИМЕЧАНИЕ: мое приложение написано на django, и я использую node + socket + amqp, чтобы иметь возможность общаться с браузером с помощью web.scokets, поэтому я использую node-Рестлер для запроса моего приложения API для получения информации об очереди пользователя.

Это сторона rabbitMQ, для узла + amqp + socket я сделал это:

на стороне сервера:

  • onConnect: объявление пользовательского обмена как fanout, autoDelete, durable.затем объявление очереди как длительной, автоматически удаляемой и исключительной, затем queue.bind к пользовательскому обмену и, наконец, queue.subscribe и socket.disconnect уничтожит очередь, так что будет существовать очередь, так как клиент подключил приложение иэто решает проблему обновления и позволяет пользователю иметь более 1 вкладки окна с приложением:

На стороне сервера:

            /*
             * unCaught exception handler
             */

            process.on('uncaughtException', function (err) {
                sys.p('Caught exception: ' + err);
                global.connection.end();
            });


            /*
             * Requiere libraries
             */

            global.sys =  require('sys');
            global.amqp = require('amqp');
            var rest = require('restler');
            var io = require('socket.io').listen(8080);

            /*
             * Module global variables
             */
            global.amqpReady = 0;


            /*
             * RabbitMQ connection
             */

            global.connection = global.amqp.createConnection({
                             host: host,
                             login: adminuser,
                             password: adminpassword,
                             vhost: vhost
                            });

            global.connection.addListener('ready', 
                        function () {
                            sys.p("RabbitMQ connection stablished");
                            global.amqpReady = 1;
                        }
            );


            /*
             * Web-Socket declaration
             */ 

            io.sockets.on('connection', function (socket) {
                socket.on('message', function (data) {
                    sys.p(data);
                    try{
                        var message = JSON.parse(data);                 
                    }catch(error){
                        socket.emit("message", JSON.stringify({"error": "invalid_params", "code": 400}));
                        var message = {};
                    }           
                    var message = JSON.parse(data);
                    if(message.token != undefined) {

                      rest.get("http://dev.kinkajougames.com/api/push",
                                {headers: 
                                    {
                                        "x-geochat-auth-token": message.token 
                                    }
                                }).on('complete', 
                                    function(data) {
                                        a = data;
                                }).on('success',
                                    function (data){
                                        sys.p(data);
                                        try{                                
                                            sys.p("---- creating exchange");
                                            socket.exchange = global.connection.exchange(data.data.bind, {type: 'fanout', durable: true, autoDelete: true});
                                            sys.p("---- declarando queue");
                                            socket.q = global.connection.queue(data.data.queue, {durable: true, autoDelete: true, exclusive: false},
                                                function (){
                                                    sys.p("---- bind queue to exchange");
                                                    //socket.q.bind(socket.exchange, "*");
                                                    socket.q.bind(socket.exchange, "*");
                                                    sys.p("---- subscribing queue exchange");
                                                    socket.q.subscribe(function (message) {
                                                        socket.emit("message", message.data.toString());
                                                    });     
                                                }
                                            );
                                        }catch(err){
                                            sys.p("Imposible to connection to rabbitMQ-server");
                                        }                                   

                                }).on('error', function (data){
                                    a = {
                                        data: data,
                                    };
                                }).on('400', function() {
                                    socket.emit("message", JSON.stringify({"error": "connection_error", "code": 400}));
                                }).on('401', function() {
                                    socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                                });               
                    }
                    else {
                      socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
                    }

                });
                socket.on('disconnect', function () {
                    socket.q.destroy(); 
                    sys.p("closing socket");
                });
            });

на стороне клиента:

  • Интервал сокета с опциями 'force new connection' = true и 'отключение синхронизации при unload' = false.
  • Клиентская сторона использует события объекта onbeforeunload и onunload для окон для отправки сокета.disconnect
  • Клиент по событию socket.connect отправляет токен пользователя на узел.
  • обрабатывает сообщение из сокета

            var socket;
            function webSocket(){
                //var socket = new io.Socket();
                socket = io.connect("ws.dev.kinkajougames.com", {'force new connection':true, 'sync disconnect on unload': false});
                //socket.connect();
    
                onSocketConnect = function(){
                    alert('Connected');
                    socket.send(JSON.stringify({
                        token: Get_Cookie('liveScoopToken')
                    }));
                };
    
                socket.on('connect', onSocketConnect);
                socket.on('message', function(data){
                    message = JSON.parse(data);
                    if (message.action == "chat") {
                        if (idList[message.data.sender] != undefined) {
                            chatboxManager.dispatch(message.data.sender, {
                                first_name: message.data.sender
                            }, message.data.message);
                        }
                        else {
                            var username = message.data.sender;
                            Data.Collections.Chats.add({
                                id: username,
                                title: username,
                                user: username,
                                desc: "Chat",
                                first_name: username,
                                last_name: ""
                            });
                            idList[message.data.sender] = message.data.sender;
                            chatboxManager.addBox(message.data.sender, {
                                title: username,
                                user: username,
                                desc: "Chat",
                                first_name: username,
                                last_name: "",
                                boxClosed: function(id){
                                    alert("closing");
                                }
                            });
                            chatboxManager.dispatch(message.data.sender, {
                                first_name: message.data.sender
                            }, message.data.message);
                        }
                    }
                });
            }                           
    
            webSocket();
    
            window.onbeforeunload = function() {
                return "You have made unsaved changes. Would you still like to leave this page?";
            }
    
            window.onunload = function (){
                socket.disconnect();
            }
    

И это все, так что больше не нужно обворовывать сообщения.

...