Объедините несколько наблюдаемых из rxjs в один буфер - PullRequest
0 голосов
/ 05 июня 2019

У меня есть 2 подключения к веб-сокету (используя LiveQuery из Parse, если это имеет значение).

Для каждой веб-сокета у меня есть 3 события, которые я хочу захватить, "create", "update"и "delete".

Скорее всего, сразу придет куча обновлений, поэтому для одного события я использую буфер для захвата последовательности обновлений, которые проходят сразу:

const base = fromEvent(listener, type);
const triggerBuffer = base.pipe(debounceTime(500));
const buffered = base.pipe(buffer(triggerBuffer));

Это прекрасно работает.Когда с сервера поступает куча обновлений, они буферизируются и отправляются сразу.

Я хочу объединить все события из обоих веб-сокетов в одно обновление.Я пытался добиться этого с помощью оператора zip следующим образом:

const setupServerObservable = (listener, listenerName) => type => {
  const base = fromEvent(listener, type);
  return base.pipe(map(x => ({ [listenerName]: { [type]: x } })));
};

const setupServerObservables = subscriptions => {
  const joins = setupServerObservable(subscriptions.joins, "joins");
  const containers = setupServerObservable(
    subscriptions.containers,
    "containers"
  );

  const eventTypes = ["update", "create", "delete"];

  const allJoins = eventTypes.map(joins);
  const allContainers = eventTypes.map(containers);
  const all = allJoins.concat(allContainers);
  const zippedObservables = zip(all);
  console.log(zippedObservables);
  const triggerBuffer = zippedObservables.pipe(debounceTime(500));
  const buffered = zippedObservables.pipe(buffer(triggerBuffer));
  return buffered;
};

Но когда я пытаюсь подписаться на этот недавно созданный Observable, он не отправляет никаких обновлений с сервера.Это console.log(zippedObservables) дает следующий вывод:

Observable {_isScalar: false, source: Observable, operator: ZipOperator}
operator: ZipOperator
    resultSelector: Array(6)
     0: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     1: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     2: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     3: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     4: Observable {_isScalar: false, source: Observable, operator: MapOperator}
     5: Observable {_isScalar: false, source: Observable, operator: MapOperator}
length: 6
__proto__: Array(0)
__proto__: Object
source: Observable {_isScalar: false, _subscribe: ƒ}
_isScalar: false
__proto__: Object

Я думаю, что, вероятно, неправильно понимаю, что делает оператор zip, но я не смог заставить его работать с merge или concatлибо.

1 Ответ

0 голосов
/ 05 июня 2019

Как только я посмотрел учебник, размещенный в комментариях, я понял, что zip не то, что я хотел.

Я попробовал то же самое с merge, но это не сработало.Кажется, что merge не принимает массив, он должен принять каждый Observable для объединения в качестве отдельного параметра, например, так:

const merged = merge(all[0], all[1], all[2], all[3], all[4], all[5]);

Итак, окончательный код:

const setupServerObservable = (listener, listenerName) => type => {
  const base = fromEvent(listener, type);
  return base.pipe(map(x => ({ [listenerName]: { [type]: x } })));
};

const setupServerObservables = subscriptions => {
  const joins = setupServerObservable(subscriptions.joins, "joins");
  const containers = setupServerObservable(
    subscriptions.containers,
    "containers"
  );

  const eventTypes = ["update", "create", "delete"];

  const allJoins = eventTypes.map(joins);
  const allContainers = eventTypes.map(containers);
  const all = allJoins.concat(allContainers);
  const merged = merge(all[0], all[1], all[2], all[3], all[4], all[5]);
  const triggerBuffer = merged.pipe(debounceTime(500));
  const buffered = merged.pipe(buffer(triggerBuffer));
  return buffered;
};

Было бы неплохо, если бы был способ сделать это с массивом, но, похоже, нет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...