Существуют по крайней мере две проблемы с тем, как вы сохранили состояние в вашем операторе custom
.
Первая проблема заключается в том, что ваши действия означают, что оператор больше не является ссылочно-прозрачным.То есть, если вызов оператора заменяется возвращаемым значением оператора, поведение будет другим:
const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
let state = 0;
return pipe(
map(next => state * next),
tap(_ => state += 1),
share()
);
};
const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
Вторая проблема, как упоминалось в другом ответе, заключается в том, что разные подписки будут получать разные значения в своих next
уведомлениях в качестве состояниявнутри оператора является общим.
Например, если наблюдаемый источник является синхронным, последовательные подписки будут видеть различные значения:
const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
let state = 0;
return pipe(
map(next => state * next),
tap(_ => state += 1),
share()
);
};
const source = range(1, 2).pipe(custom());
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
Однако можно написать оператор, очень похожий на ваш оператор custom
, и он будет работать правильно при любых обстоятельствах.Для этого необходимо убедиться, что любое состояние внутри оператора равно для каждой подписки .
Трубный оператор - это просто функция, которая принимает наблюдаемое и возвращает наблюдаемое, так что вы можетеиспользуйте defer
, чтобы убедиться, что ваш штат соответствует подписке, например:
const { defer, pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
return source => defer(() => {
let state = 0;
return source.pipe(
map(next => state * next),
tap(_ => state += 1)
);
}).pipe(share());
};
const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
const source = range(1, 2).pipe(op);
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>