Проблема прослушивания источника событий узла в потоке событий, отправленных сервером (сервер создает прослушиватель для каждого активного пользователя) - PullRequest
0 голосов
/ 01 мая 2020

У меня есть функция, настроенная для опроса стороннего API, и я использую отправленные сервером события вместе с отправителем событий для доставки обновлений клиентам:

// server.js
const interval = 60 * 60 * 1000;
setInterval(async () => {
  const toStream = await poll();
  app.emit('update', toStream);
}, interval);
    // sync.js
    router.get('/stream', [auth], (req, res) => {
      const user = req.user.id;
      res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Content-Encoding': 'none',
        Connection: 'keep-alive'
      });
      req.app.on('update', toStream => {
        if (toStream.some(toStreamE => toStreamE.user === user)) {
          res.write('event: message\n');
          res.write(
            `data: ${JSON.stringify(
              toStream.find(toStreamE => toStreamE.user === user)
            )}\n`
          );
          res.write('\n\n');
        }
      });
    });

И это работает , когда у вас есть пара пользователей. Проблема заключается в том, что сервер создает прослушиватель для каждого клиентского соединения, и, как только у вас будет дюжина из них, будет достигнут предел для прослушивателей для одного события, и последующие соединения с потоком будут отброшены. У меня вопрос, как мне достичь той же функциональности, но с одним слушателем? Где я должен положить это? Я также пытался ссылаться на приложение другим способом, но это не помогло:

app.use('/api/sync', require('./routes/sync')(app));

РЕДАКТИРОВАТЬ: Добавил ответ на свой вопрос

1 Ответ

0 голосов
/ 02 мая 2020

Это было сложно, но я понял это. Но сначала стоит упомянуть, что узел на самом деле не ограничивает количество слушателей, несмотря на выдачу предупреждения, как я изначально думал. В любом случае, чтобы это работало, я создал отдельный модуль, содержащий объект, на котором размещен эмиттер, плюс массив, который отслеживает ваши соединения с потоком SSE:

// emitter.js

const EventEmitter = require('events').EventEmitter;

const emitterObj = {
  emitter: new EventEmitter(),
  connections: [],
  emitFunc: function (ev, data) {
    this.emitter.emit(ev, data);
  },
  addCon: function (res) {
    if (this.connections.some(resE => resE.req.user.id === res.req.user.id)) {
      this.connections.splice(
        this.connections.findIndex(
          resE => resE.req.user.id === res.req.user.id
        ),
        1,
        res
      );
    } else {
      this.connections.push(res);
    }
    const conTimeout = 65 * 60 * 1000;
    setTimeout(() => {
      this.connections.splice(
        this.connections.findIndex(
          resE => resE.req.user.id === res.req.user.id
        ),
        1
      );
    }, conTimeout);
  }
};

emitterObj.listenFunc = emitterObj.emitter.on('update', toStream => {
  emitterObj.connections.map(resE => {
    if (toStream.some(streamE => streamE.user === resE.req.user.id)) {
      const data = toStream.find(streamE => streamE.user === resE.req.user.id);
      resE.write('event: message\n');
      resE.write(`data: ${JSON.stringify(data)}\n`);
      resE.write('\n\n');
    }
  });
});

module.exports = emitterObj;

В моей реализации я ограничиваю пользователей одно параллельное соединение с потоком, хотя при необходимости его можно изменить. Кроме того, их соединение должно быть в конечном итоге удалено из массива, иначе оно будет расти и замедлять все, я решил сделать это через 65 минут. В слушателе я просматриваю массив соединений и отправляю пользователям обновления, если они есть. Конечно, теперь вы должны использовать тот же излучатель на сервере. js также:

// server.js

const emitterObj = require('./emitter');

// ...

const interval = 60 * 60 * 1000;
setInterval(async () => {
  const toStream = await poll();
  emitterObj.emitFunc('update', toStream);
}, interval);

Маршрут syn c теперь выглядит следующим образом:

// sync.js

router.get('/stream', [auth], (req, res) => {
  emitterObj.addCon(res);
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Content-Encoding': 'none',
    Connection: 'keep-alive'
  });
  emitterObj.listenFunc;
});

Надеюсь это помогает кому-то.

РЕДАКТИРОВАТЬ: Очевидно, req.on ('close') очень помогает в моем случае. Я добавил его, чтобы удалить соединение (все еще есть таймер на случай, если он по какой-то причине не срабатывает). Кроме того, клиент должен периодически проверяться, чтобы он не разрывал соединение. Самая большая проблема здесь в том, что Firefox не восстанавливает его, если это произойдет, в отличие от Chrome. Мои дополнения:

  const intervalID = setInterval(() => {
    res.write(`event: ping\n`);
    res.write(`data: keep connection alive\n\n`);
  }, 60 * 1000);
  req.on('close', () => {
    clearInterval(intervalID);
    emitterObj.deleteCon(req);
  });



  deleteCon: function (req) {
    if (this.connections.some(resE => resE.req.user.id === req.user.id)) {
      this.connections.splice(
        this.connections.findIndex(resE => resE.req.user.id === req.user.id),
        1
      );
    }
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...