Преобразование событий websocket в наблюдаемые - PullRequest
0 голосов
/ 10 октября 2018

Я использую библиотеку ws для создания клиента веб-сокета для связи с сервером (на основе JSON).Каждый запрос должен иметь уникальный requestId, и ответ будет иметь тот же requestId

Упрощенный пример моего кода

let requestId = 0;

const WebSocket = require('ws');
const ws = new WebSocket('ws://www.host.com/path');

ws.on('message', (json) => {
  let obj = JSON.parse(json);

  if (obj.eventType === 'SomeEvent') {
     ws.send({
       requestId,
       command: "DoSomeStuff"
     });
  }
});

Я хотел бы реализовать метод send в моем файле, который вызывает внутреннеws.send(), но возвращает наблюдаемое, которое разрешается, когда веб-сокет получает ws.on('message'), где свойство requestId совпадает с requestId.

Таким образом я могу подписаться на Observable и изменить свой код на

...
  if (obj.eventType === 'SomeEvent') {
     mySendFunction({
       requestId,
       command: "DoSomeStuff"
     }).subscribe(result => {
        // Do something with result here
     }
  }
...

Также - иногда (редко) сервер не отвечает на запрос.В таком случае, есть ли способ тайм-аута наблюдаемого в случае, если ответ не получен в течение заданного периода времени?


Я видел реализации, которые используют массив для сопоставления идентификаторов requestId с обещаниями, а затемразрешив их вручную, но я полагаю, что должен быть разумный способ сделать это напрямую с RxJS.

Есть идеи?

1 Ответ

0 голосов
/ 10 октября 2018

Очень простая вещь, которую вы можете сделать, это просто использовать new Observable():

const mySendFunction = options => new Observable(observer => {
  const handler = ws.on('message', json => {
    let obj = JSON.parse(json);

    if (obj.eventType === options.command) {
      observer.next(obj);
      observer.complete();
    }
  });

  ws.send(options);

  return () => {
    // ...cancel handler?
  };
});
...