Я пытаюсь использовать этот репозиторий для обработки очереди сообщений.
Ссылка: https://github.com/rajaraodv/rabbitpubsub
Но это было восемь лет go. Поэтому мне пришлось кое-что изменить в его реализации. Например, express. js et c.
Когда я запускаю этот пример, он прослушивает порт 3000. Но в веб-браузере возникает ошибка. Я включил сюда содержимое файла app. js и package. json. Может ли кто-нибудь показать способ решить эту проблему?
введите здесь описание изображения
Приложение js
var express = require('express')
, routes = require('./routes')
, http = require('http')
, path = require('path')
, redis = require('redis')
, amqp = require('amqp');
const session = require('express-session');
const redisStore = require('connect-redis')(session);
var cookieParser = require('cookie-parser');
var favIcon = require('express-favicon');
var expressLogger = require('express-logger');
var bodyParser = require('body-parser');
var methodOverride = require('method-override');
var errorHandler = require('express-error-handler');
const router = express.Router();
var rabbitConn = amqp.createConnection({});
var chatExchange;
rabbitConn.on('ready', function () {
chatExchange = rabbitConn.exchange('chatExchange', { 'type': 'fanout' });
});
/*
Setup Express & Socket.io
*/
var app = express();
var server = http.createServer(app);
var io = require('socket.io').listen(server);
//Set xhr-polling as WebSocket is not supported by CF
io.set("transports", ["xhr-polling"]);
//Set Socket.io's log level to 1 (info). Default is 3 (debugging)
io.set('log level', 1);
/*
Also use Redis for Session Store. Redis will keep all Express sessions in it.
*/
var RedisStore = redisStore,
rClient = redis.createClient(),
sessionStore = new RedisStore({ client: rClient });
var cookieParser = cookieParser('your secret here');
app.set('port', process.env.PORT || 3000);
app.set('views', __dirname + '/views');
app.set('view engine', 'ejs');
app.use(favIcon);
app.use(expressLogger);
app.use(bodyParser);
app.use(methodOverride);
/*
Use cookieParser and session middlewares together.
By default Express/Connect app creates a cookie by name 'connect.sid'.But to scale Socket.io app,
make sure to use cookie name 'jsessionid' (instead of connect.sid) use Cloud Foundry's 'Sticky Session' feature.
W/o this, Socket.io won't work if you have more than 1 instance.
If you are NOT running on Cloud Foundry, having cookie name 'jsessionid' doesn't hurt - it's just a cookie name.
*/
app.use(cookieParser);
app.use(session({
store: sessionStore,
key: 'jsessionid',
secret: 'your secret here',
name: 'jsessionid',
proxy: true,
resave: true,
saveUninitialized: true
}));
app.use(express.static(path.join(__dirname, 'public')));
app.use(errorHandler);
app.get('/', routes.index);
app.get('/logout', function (req, res) {
req.session.destroy();
res.redirect('/');
});
/*
When the user logs in (in our case, does http POST w/ user name), store it
in Express session (which in turn is stored in Redis)
*/
app.post('/user', function (req, res) {
req.session.user = req.body.user;
res.json({ "error": "" });
});
/*
Use SessionSockets so that we can exchange (set/get) user data b/w sockets and http sessions
Pass 'jsessionid' (custom) cookie name that we are using to make use of Sticky sessions.
*/
var SessionSockets = require('session.socket.io');
var sessionSockets = new SessionSockets(io, sessionStore, cookieParser, 'jsessionid');
sessionSockets.on('connection', function (err, socket, session) {
/**
* When a user sends a chat message, publish it to chatExchange w/o a Routing Key (Routing Key doesn't matter
* because chatExchange is a 'fanout').
*
* Notice that we are getting user's name from session.
*/
socket.on('chat', function (data) {
var msg = JSON.parse(data);
var reply = { action: 'message', user: session.user, msg: msg.msg };
chatExchange.publish('', reply);
});
/**
* When a user joins, publish it to chatExchange w/o Routing key (Routing doesn't matter
* because chatExchange is a 'fanout').
*
* Note: that we are getting user's name from session.
*/
socket.on('join', function () {
var reply = { action: 'control', user: session.user, msg: ' joined the channel' };
chatExchange.publish('', reply);
});
/**
* Initialize subscriber queue.
* 1. First create a queue w/o any name. This forces RabbitMQ to create new queue for every socket.io connection w/ a new random queue name.
* 2. Then bind the queue to chatExchange w/ "#" or "" 'Binding key' and listen to ALL messages
* 3. Lastly, create a consumer (via .subscribe) that waits for messages from RabbitMQ. And when
* a message comes, send it to the browser.
*
* Note: we are creating this w/in sessionSockets.on('connection'..) to create NEW queue for every connection
*/
rabbitConn.queue('', { exclusive: true }, function (q) {
//Bind to chatExchange w/ "#" or "" binding key to listen to all messages.
q.bind('chatExchange', "");
//Subscribe When a message comes, send it back to browser
q.subscribe(function (message) {
socket.emit('chat', JSON.stringify(message));
});
});
});
server.listen(app.get('port'), function () {
console.log("Express server listening on port " + app.get('port'));
});
Пакет. json
{
"name": "rabbitpubsub",
"version": "0.0.1",
"private": true,
"scripts": {
"start": "node app"
},
"dependencies": {
"amqp": "*",
"connect-redis": "*",
"cookie-parser": "*",
"ejs": "*",
"express": "*",
"express-session": "*",
"express-favicon": "*",
"express-logger": "*",
"express-error-handler": "*",
"body-parser": "*",
"method-override": "*",
"redis": "*",
"session.socket.io": "*",
"socket.io": "*"
}
}