Я решил это следующим образом:
Я использовал для объявления очереди rabbitMq как durable = true, autoDelete = false, exclusive = false, и в моем приложении была 1 очередь / пользователь и 1 обмен (type =прямое) с помощью routing_key name = queueName, мое приложение также использовало очередь для других клиентов, таких как браузеры, такие как приложение для Android или приложение для iphone, как запасной вариант, поэтому я использую для создания 1 очереди для пользователя earch.
Решениечтобы эта проблема состояла в том, чтобы изменить мою очередь rabbitMQ и объявить обмен.Теперь я объявляю exchange / user как fanout и autoDelete = True, и у пользователя будет N очередей с durable = true, autoDelete = true, exclusive = true (№ queue = № клиентов), и все очереди привязаныto user-exchange (multicast).
ПРИМЕЧАНИЕ: мое приложение написано на django, и я использую node + socket + amqp, чтобы иметь возможность общаться с браузером с помощью web.scokets, поэтому я использую node-Рестлер для запроса моего приложения API для получения информации об очереди пользователя.
Это сторона rabbitMQ, для узла + amqp + socket я сделал это:
на стороне сервера:
- onConnect: объявление пользовательского обмена как fanout, autoDelete, durable.затем объявление очереди как длительной, автоматически удаляемой и исключительной, затем queue.bind к пользовательскому обмену и, наконец, queue.subscribe и socket.disconnect уничтожит очередь, так что будет существовать очередь, так как клиент подключил приложение иэто решает проблему обновления и позволяет пользователю иметь более 1 вкладки окна с приложением:
На стороне сервера:
/*
* unCaught exception handler
*/
process.on('uncaughtException', function (err) {
sys.p('Caught exception: ' + err);
global.connection.end();
});
/*
* Requiere libraries
*/
global.sys = require('sys');
global.amqp = require('amqp');
var rest = require('restler');
var io = require('socket.io').listen(8080);
/*
* Module global variables
*/
global.amqpReady = 0;
/*
* RabbitMQ connection
*/
global.connection = global.amqp.createConnection({
host: host,
login: adminuser,
password: adminpassword,
vhost: vhost
});
global.connection.addListener('ready',
function () {
sys.p("RabbitMQ connection stablished");
global.amqpReady = 1;
}
);
/*
* Web-Socket declaration
*/
io.sockets.on('connection', function (socket) {
socket.on('message', function (data) {
sys.p(data);
try{
var message = JSON.parse(data);
}catch(error){
socket.emit("message", JSON.stringify({"error": "invalid_params", "code": 400}));
var message = {};
}
var message = JSON.parse(data);
if(message.token != undefined) {
rest.get("http://dev.kinkajougames.com/api/push",
{headers:
{
"x-geochat-auth-token": message.token
}
}).on('complete',
function(data) {
a = data;
}).on('success',
function (data){
sys.p(data);
try{
sys.p("---- creating exchange");
socket.exchange = global.connection.exchange(data.data.bind, {type: 'fanout', durable: true, autoDelete: true});
sys.p("---- declarando queue");
socket.q = global.connection.queue(data.data.queue, {durable: true, autoDelete: true, exclusive: false},
function (){
sys.p("---- bind queue to exchange");
//socket.q.bind(socket.exchange, "*");
socket.q.bind(socket.exchange, "*");
sys.p("---- subscribing queue exchange");
socket.q.subscribe(function (message) {
socket.emit("message", message.data.toString());
});
}
);
}catch(err){
sys.p("Imposible to connection to rabbitMQ-server");
}
}).on('error', function (data){
a = {
data: data,
};
}).on('400', function() {
socket.emit("message", JSON.stringify({"error": "connection_error", "code": 400}));
}).on('401', function() {
socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
});
}
else {
socket.emit("message", JSON.stringify({"error": "invalid_token", "code": 401}));
}
});
socket.on('disconnect', function () {
socket.q.destroy();
sys.p("closing socket");
});
});
на стороне клиента:
- Интервал сокета с опциями 'force new connection' = true и 'отключение синхронизации при unload' = false.
- Клиентская сторона использует события объекта onbeforeunload и onunload для окон для отправки сокета.disconnect
- Клиент по событию socket.connect отправляет токен пользователя на узел.
обрабатывает сообщение из сокета
var socket;
function webSocket(){
//var socket = new io.Socket();
socket = io.connect("ws.dev.kinkajougames.com", {'force new connection':true, 'sync disconnect on unload': false});
//socket.connect();
onSocketConnect = function(){
alert('Connected');
socket.send(JSON.stringify({
token: Get_Cookie('liveScoopToken')
}));
};
socket.on('connect', onSocketConnect);
socket.on('message', function(data){
message = JSON.parse(data);
if (message.action == "chat") {
if (idList[message.data.sender] != undefined) {
chatboxManager.dispatch(message.data.sender, {
first_name: message.data.sender
}, message.data.message);
}
else {
var username = message.data.sender;
Data.Collections.Chats.add({
id: username,
title: username,
user: username,
desc: "Chat",
first_name: username,
last_name: ""
});
idList[message.data.sender] = message.data.sender;
chatboxManager.addBox(message.data.sender, {
title: username,
user: username,
desc: "Chat",
first_name: username,
last_name: "",
boxClosed: function(id){
alert("closing");
}
});
chatboxManager.dispatch(message.data.sender, {
first_name: message.data.sender
}, message.data.message);
}
}
});
}
webSocket();
window.onbeforeunload = function() {
return "You have made unsaved changes. Would you still like to leave this page?";
}
window.onunload = function (){
socket.disconnect();
}
И это все, так что больше не нужно обворовывать сообщения.