У меня есть 2 подключения к веб-сокету (используя LiveQuery
из Parse
, если это имеет значение).
Для каждой веб-сокета у меня есть 3 события, которые я хочу захватить, "create"
, "update"
и "delete"
.
Скорее всего, сразу придет куча обновлений, поэтому для одного события я использую буфер для захвата последовательности обновлений, которые проходят сразу:
const base = fromEvent(listener, type);
const triggerBuffer = base.pipe(debounceTime(500));
const buffered = base.pipe(buffer(triggerBuffer));
Это прекрасно работает.Когда с сервера поступает куча обновлений, они буферизируются и отправляются сразу.
Я хочу объединить все события из обоих веб-сокетов в одно обновление.Я пытался добиться этого с помощью оператора zip
следующим образом:
const setupServerObservable = (listener, listenerName) => type => {
const base = fromEvent(listener, type);
return base.pipe(map(x => ({ [listenerName]: { [type]: x } })));
};
const setupServerObservables = subscriptions => {
const joins = setupServerObservable(subscriptions.joins, "joins");
const containers = setupServerObservable(
subscriptions.containers,
"containers"
);
const eventTypes = ["update", "create", "delete"];
const allJoins = eventTypes.map(joins);
const allContainers = eventTypes.map(containers);
const all = allJoins.concat(allContainers);
const zippedObservables = zip(all);
console.log(zippedObservables);
const triggerBuffer = zippedObservables.pipe(debounceTime(500));
const buffered = zippedObservables.pipe(buffer(triggerBuffer));
return buffered;
};
Но когда я пытаюсь подписаться на этот недавно созданный Observable
, он не отправляет никаких обновлений с сервера.Это console.log(zippedObservables)
дает следующий вывод:
Observable {_isScalar: false, source: Observable, operator: ZipOperator}
operator: ZipOperator
resultSelector: Array(6)
0: Observable {_isScalar: false, source: Observable, operator: MapOperator}
1: Observable {_isScalar: false, source: Observable, operator: MapOperator}
2: Observable {_isScalar: false, source: Observable, operator: MapOperator}
3: Observable {_isScalar: false, source: Observable, operator: MapOperator}
4: Observable {_isScalar: false, source: Observable, operator: MapOperator}
5: Observable {_isScalar: false, source: Observable, operator: MapOperator}
length: 6
__proto__: Array(0)
__proto__: Object
source: Observable {_isScalar: false, _subscribe: ƒ}
_isScalar: false
__proto__: Object
Я думаю, что, вероятно, неправильно понимаю, что делает оператор zip
, но я не смог заставить его работать с merge
или concat
либо.