Это было сложно, но я понял это. Но сначала стоит упомянуть, что узел на самом деле не ограничивает количество слушателей, несмотря на выдачу предупреждения, как я изначально думал. В любом случае, чтобы это работало, я создал отдельный модуль, содержащий объект, на котором размещен эмиттер, плюс массив, который отслеживает ваши соединения с потоком SSE:
// emitter.js
const EventEmitter = require('events').EventEmitter;
const emitterObj = {
emitter: new EventEmitter(),
connections: [],
emitFunc: function (ev, data) {
this.emitter.emit(ev, data);
},
addCon: function (res) {
if (this.connections.some(resE => resE.req.user.id === res.req.user.id)) {
this.connections.splice(
this.connections.findIndex(
resE => resE.req.user.id === res.req.user.id
),
1,
res
);
} else {
this.connections.push(res);
}
const conTimeout = 65 * 60 * 1000;
setTimeout(() => {
this.connections.splice(
this.connections.findIndex(
resE => resE.req.user.id === res.req.user.id
),
1
);
}, conTimeout);
}
};
emitterObj.listenFunc = emitterObj.emitter.on('update', toStream => {
emitterObj.connections.map(resE => {
if (toStream.some(streamE => streamE.user === resE.req.user.id)) {
const data = toStream.find(streamE => streamE.user === resE.req.user.id);
resE.write('event: message\n');
resE.write(`data: ${JSON.stringify(data)}\n`);
resE.write('\n\n');
}
});
});
module.exports = emitterObj;
В моей реализации я ограничиваю пользователей одно параллельное соединение с потоком, хотя при необходимости его можно изменить. Кроме того, их соединение должно быть в конечном итоге удалено из массива, иначе оно будет расти и замедлять все, я решил сделать это через 65 минут. В слушателе я просматриваю массив соединений и отправляю пользователям обновления, если они есть. Конечно, теперь вы должны использовать тот же излучатель на сервере. js также:
// server.js
const emitterObj = require('./emitter');
// ...
const interval = 60 * 60 * 1000;
setInterval(async () => {
const toStream = await poll();
emitterObj.emitFunc('update', toStream);
}, interval);
Маршрут syn c теперь выглядит следующим образом:
// sync.js
router.get('/stream', [auth], (req, res) => {
emitterObj.addCon(res);
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Content-Encoding': 'none',
Connection: 'keep-alive'
});
emitterObj.listenFunc;
});
Надеюсь это помогает кому-то.
РЕДАКТИРОВАТЬ: Очевидно, req.on ('close') очень помогает в моем случае. Я добавил его, чтобы удалить соединение (все еще есть таймер на случай, если он по какой-то причине не срабатывает). Кроме того, клиент должен периодически проверяться, чтобы он не разрывал соединение. Самая большая проблема здесь в том, что Firefox не восстанавливает его, если это произойдет, в отличие от Chrome. Мои дополнения:
const intervalID = setInterval(() => {
res.write(`event: ping\n`);
res.write(`data: keep connection alive\n\n`);
}, 60 * 1000);
req.on('close', () => {
clearInterval(intervalID);
emitterObj.deleteCon(req);
});
deleteCon: function (req) {
if (this.connections.some(resE => resE.req.user.id === req.user.id)) {
this.connections.splice(
this.connections.findIndex(resE => resE.req.user.id === req.user.id),
1
);
}
}