mergeMap
, как и многие другие так называемые операторы отображения более высокого порядка, поддерживает один или несколько внутренних наблюдаемых.
внутреннее наблюдаемое создается с помощью внешнее значение и предоставленная функция . Внешнее значение по сути - это просто значение, полученное от его источника. Например:
of(1, 2, 3).pipe(
mergeMap((outerValue, index) => /* ... return an observable ... */)
).subscribe(); // `outerValue`: 1, 2, 3 (separately)
Когда приходит внешнее значение, будет создана новая внутренняя наблюдаемая . Я думаю, лучший способ понять это - взглянуть на исходный код :
// `value` - the `outerValue`
protected _next(value: T): void {
if (this.active < this.concurrent) {
this._tryNext(value);
} else {
this.buffer.push(value);
}
}
protected _tryNext(value: T) {
let result: ObservableInput<R>;
const index = this.index++;
try {
// Create the inner observable based on the `outerValue` and the provided function (`this.project`)
// `mergeMap(project)`
result = this.project(value, index);
} catch (err) {
this.destination.error(err);
return;
}
this.active++;
// Subscribe to the inner observable
this._innerSub(result, value, index);
}
Пожалуйста, пока не обращайте внимания на concurrent
и buffer
, у нас будет посмотрите на них чуть позже.
Теперь, что происходит, когда излучается внутреннее наблюдаемое? Прежде чем идти дальше, стоит упомянуть, что, хотя это очевидно, для внутреннего наблюдаемого требуется внутренний подписчик . Мы можем увидеть это в методе _innerSub
сверху:
private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
const innerSubscriber = new InnerSubscriber(this, value, index);
const destination = this.destination as Subscription;
destination.add(innerSubscriber);
// This is where the subscription takes place
subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
}
Когда излучается внутренний наблюдаемый объект, будет вызван метод notifyNext
:
notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this.destination.next(innerValue);
}
Где пункт назначения указывает на следующего абонента в цепочке. Например, это может быть следующее:
of(1)
.pipe(
mergeMap(/* ... */)
)
.subscribe({} /* <- this is the `destination` for `mergeMap` */)
Это будет более подробно объяснено в Что насчет следующего абонента в цепочке ниже.
Итак, что означает to mix 2 observables
?
Давайте посмотрим на этот пример:
of(2, 3, 1)
.pipe(
mergeMap(outerValue => timer(outerValue).pipe(mapTo(outerValue)))
)
.subscribe(console.log)
/* 1 \n 2 \n 3 */
Когда приходит 2
, mergeMap
подписывается на внутреннее наблюдаемое, которое выдаст через 200
мс. Это асинхронное действие, но обратите внимание, что внешние значения (2, 3, 1) прибывают синхронно. Затем прибывает 3
и создаст внутренний obs. который будет выдавать через 300
мс. Поскольку текущий скрипт еще не завершен, очередь обратного вызова еще не рассматривается. Теперь 1
прибывает и создаст внутренний obs. который будет испускаться за 100
мс.
mergeMap
теперь имеет 3 внутренних наблюдаемых и будет передавать внутреннее значение того, что излучает внутреннее наблюдаемое. Как и ожидалось, получаем 1
, 2
, 3
.
Вот что делает mergeMap
. Смешивание наблюдаемых можно рассматривать так: если приходит внешнее значение и внутреннее наблюдаемое уже создано, то mergeMap
просто говорит: «Нет проблем, я просто создам новые внутренние obs. и подпишитесь на него ".
А как насчет concurrent
и buffer
mergeMap
может быть дан второй аргумент concurrent
, который указывает, сколько внутренних наблюдаемых объектов должно обрабатываться в то же время. Это количество активных внутренних наблюдаемых отслеживается с помощью свойства active
.
Как видно в методе _next
, если active >= concurrent
, outerValues
будет добавлено к buffer
, которое является queue (FIFO
).
Затем, когда одна активная внутренняя наблюдаемая завершит , mergeMap
возьмет самое старое значение из значения и создаст из него внутреннюю наблюдаемую, используя предоставленная функция:
// Called when an inner observable completes
notifyComplete(innerSub: Subscription): void {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if (buffer.length > 0) {
this._next(buffer.shift()!); // Create a new inner obs. with the oldest buffered value
} else if (this.active === 0 && this.hasCompleted) {
this.destination.complete();
}
}
Имея это в виду, concatMap(project)
это просто mergeMap(project, 1)
.
Итак, если у вас есть:
of(2, 3, 1)
.pipe(
mergeMap(outerValue => timer(outerValue * 100).pipe(mapTo(outerValue)), 1)
)
.subscribe(console.log)
, это будет быть зарегистрированным:
2 \n 3 \n 1
.
А как насчет следующего абонента в цепочке
Операторы функции которые return другая функция , которая принимает observable как свой единственный параметр и return другой observable . Когда поток подписывается, каждая наблюдаемая, возвращаемая оператором, будет иметь своего собственного подписчика .
Все эти подписчики могут отображаться как связанный список. Например:
// S{n} -> Subscriber `n`, where `n` depends on the order in which the subscribers are created
of(/* ... */)
.pipe(
operatorA(), // S{4}
operatorB(), // S{3}
operatorC(), // S{2}
).subscribe({ /* ... */ }) // S{1}; the observer is converted into a `Subscriber`
S{n}
- родительский ( пункт назначения ) для S{n+1}
, что означает, что S{1}
- это пункт назначения S{2}
, S{2}
- это пункт назначения S{3}
и т. д.
StackBlitz
Неожиданные результаты
Сравните следующие:
of(2, 1, 0)
.pipe(
mergeMap(v => timer(v * 100).pipe(mapTo(v)))
).subscribe(console.log)
// 0 1 2
of(2, 1, 0)
.pipe(
mergeMap(v => timer(v).pipe(mapTo(v)))
).subscribe(console.log)
// 1 0 2
Согласно MDN :
Указанное количество времени (или задержка) - это не гарантированное время выполнения, а скорее минимальное время выполнения. Обратные вызовы, которые вы передаете этим функциям, не могут выполняться, пока стек в основном потоке не станет пустым.
Как следствие, такой код, как setTimeout (fn, 0), будет выполняться, как только стек станет пустым, а не сразу. Если вы выполните такой код, как setTimeout (fn, 0), но сразу после выполнения al oop, который насчитывает от 1 до 10 миллиардов, ваш обратный вызов будет выполнен через несколько секунд.
Этот раздел от MDN также должен прояснить ситуацию.
Я бы сказал, что это зависит от среды c, а не Rx Js -specifi c.
Во втором фрагменте задержки идут последовательно, поэтому вы получаете неожиданные результаты . Если вы немного увеличите задержки, например: timer(v * 2)
, вы должны получить ожидаемое поведение.