Наблюдаемый, выполняющийся один раз с несколькими подписчиками - PullRequest
0 голосов
/ 15 мая 2018

У меня есть фрагмент кода, который я хотел бы выполнять периодически, пока все подписчики не отписались.

// This function shall be called *once* per tick,
// no matter the quantity of subscriber.
function doSomething(val) {
    console.log("doing something");
    return val;
}

observable = Rx.Observable.timer(0, 1000).map(val => doSomething(val));

const first = observable.subscribe(val => console.log("first:", val));
const second = observable.subscribe(val => console.log("second:", val));

// After 1.5 seconds, stop first.
Rx.Observable.timer(1500).subscribe(_ => first.unsubscribe());
// After 2.5 seconds, stop second.
Rx.Observable.timer(2500).subscribe(_ => second.unsubscribe());

JSFiddle

Мой ожидаемый результат будет выглядетьчто:

doing something
first: 0
second: 0
doing something
first: 1
second: 1
doing something
second: 2
<nothing more>

Однако функция doSomething вызывается дважды при вызове двух наблюдаемых.Вот фактический результат:

doing something
first: 0
doing something
second: 0
doing something
first: 1
doing something
second: 1
doing something
second: 2
<nothing more>

Я делаю ошибку дизайна?Есть ли способ сделать это?

Ответы [ 2 ]

0 голосов
/ 16 мая 2018

Живой рабочий пример .Тогда вам нужно использовать Subjects.Обычные наблюдаемые являются одноадресными (это означает, что каждый подписчик владеет независимым исполнением Observable).Таким образом, каждый наблюдатель получает вызов всей цепочки казни, которую вы имеете.

observable = Rx.Observable.timer(0, 1000)
  .map(val => doSomething(val));

map вызывается для каждого наблюдателя.

Субъекты являютсяспециальный тип Observables, который позволяет значениям быть многоадресными, это означает, что вы разделяете одну строку выполнения вашего Observable.Это rxjs6, если вы заблудились с конвейерными операторами, , пожалуйста, посмотрите здесь .

Прежде всего, получите imports,

import { Observable, Subject, timer } from 'rxjs';
import { map, share } from 'rxjs/operators';

Тогда у вас есть,

const subject = new Subject();

const doSomething = val => {
  console.log("doing something");
  return val;
}

const observable = timer(0, 1000).pipe(
  map(val => doSomething(val)),
).pipe(share());

const first = observable.subscribe(val => console.log("first:", val));
const second = observable.subscribe(val => console.log("second:", val));
const tercer = observable.subscribe(val => console.log("tercer:", val));

// After 1.5 seconds, stop first.
timer(1500).subscribe(_ => first.unsubscribe());
// After 2.5 seconds, stop second.
timer(2500).subscribe(_ => second.unsubscribe());
// After 2.5 seconds, stop second.
timer(2500).subscribe(_ => tercer.unsubscribe());
0 голосов
/ 16 мая 2018

Вы видите правильное поведение. Наблюдаемое, возвращаемое interval, холодно. То есть таймер не создается до тех пор, пока наблюдатель не подпишется, а когда он это делает, таймер, созданный специально для этой подписки.

Ожидаемое поведение может быть реализовано с помощью оператора share:

observable = Rx.Observable
  .timer(0, 1000)
  .map(val => doSomething(val))
  .share();

Справочник по оператору share подсчитывает подписки и осуществляет многоадресную рассылку источника, наблюдаемого для нескольких подписчиков - поэтому будет только один интервал / таймер, общий для двух подписчиков.

Для получения дополнительной информации вы можете найти эту статью полезной.

...