Мой пример использования таков: я подключаюсь к службе с помощью веб-сокета и получаю периодические (но непредсказуемые) данные о работоспособности службы.Приложение может иметь несколько пользователей этого потока данных, поэтому я хочу поделиться им.Новые подписчики должны видеть самые последние данные о здоровье.Я также хочу закрыть веб-сокет, когда подписчиков больше нет.
Мое приложение использовало shareReplay(1)
в течение достаточно долгого времени, пока не было обнаружено, что оно пропускает базовое соединение (https://blog.strongbrew.io/share-replay-issue/)., при которомТочка, которую мы изменили на pipe(publishReplay(1), refCount)
. Оказывается, у этого также есть тонкое, чего я не ожидал:
- Подписчик A подключается и соединение через веб-сокет установлено.
- Подписчик Bправильно подключается и делится, а также получает самые последние данные.
- Отключаются и A, и B. Отключается веб-розетка
- Подключается абонент C, но ему нужно только одно значение
take(1)
.Возвращается значение, которое кэшируется publishReplay(1)
.
На шаге 4 я действительно хотел воссоздать веб-сокет. Кэшированное значение бесполезно. Параметр timewindow publishReplay
заманчиво, но не совсем то, что я хочу.
Мне удалось найти решение, используя pipe(multicast(() => new ReplaySubject(1)), refCount())
, но я не знаю Rx достаточно хорошо, чтобы понять все последствия этого.
У меня вопрос - какой лучший способ добиться желаемого поведения?
Спасибо!
Пример кода можно увидеть на https://repl.it/@bradb/MinorColdRouter Встроенный код
const { Observable, ReplaySubject } = require('rxjs');
const { tap, multicast, take, publishReplay, refCount } = require('rxjs/operators');
const log = console.log;
function eq(a, b) {
let result = JSON.stringify(a) == JSON.stringify(b);
if (!result) {
log('eq failed', a, b);
}
return result;
}
function assert(cond, msg) {
if (!cond) {
log('****************************************');
log('Assert failed: ', msg);
log('****************************************');
}
}
function delay(t) {
return new Promise(resolve => {
setTimeout(resolve, t);
});
}
let liveCount = 0;
// emitValue 1 happens at 100ms, 2 at 200ms etc
function testSource() {
return Observable.create(function(observer) {
let emitValue = 1;
liveCount++;
log('create');
let interv = setInterval(() => {
log('next --------> ', emitValue);
observer.next(emitValue);
emitValue++;
}, 100);
return () => {
liveCount--;
log('destroy');
clearInterval(interv);
};
});
}
async function doTest(name, o) {
log('\nDOTEST: ', name);
assert(liveCount === 0, 'Start off not live');
let a_vals = [];
o.pipe(take(4)).subscribe(x => {
a_vals.push(x);
});
await delay(250);
assert(liveCount === 1, 'Must be alive');
let b_vals = [];
o.pipe(take(2)).subscribe(x => {
b_vals.push(x);
});
assert(liveCount === 1, 'Two subscribers, one source');
await delay(500);
assert(liveCount === 0, 'source is destroyed');
assert(eq(a_vals, [1, 2, 3, 4]), 'a vals match');
assert(eq(b_vals, [2, 3]), 'b vals match');
let c_vals = [];
o.pipe(take(2)).subscribe(x => {
c_vals.push(x);
});
assert(liveCount === 1, 'Must be alive');
await delay(300);
assert(liveCount === 0, 'Destroyed');
assert(eq(c_vals, [1, 2]), 'c_vals match');
}
async function main() {
await doTest(
'bad: cached value is stale',
testSource().pipe(
publishReplay(1),
refCount()
)
);
await doTest(
'good: But why is this different to publish replay?',
testSource().pipe(
multicast(() => new ReplaySubject(1)),
refCount()
)
);
await doTest(
'bad: But why is this different to the above?',
testSource().pipe(
multicast(new ReplaySubject(1)),
refCount()
)
);
}
main();