Интересная проблема!
Итак, windowTime(1000)
будет выдавать новое окно каждые 1000 мс. Но что такое окно? Окно имеет значение Subject
instance.
windowTime
может управлять несколькими windows, но когда вы предоставляете только первый аргумент (называемый windowTimeSpan
), будет только один активное окно . Это означает, что после прохода windowTimeSpan
мс текущее окно будет закрыто , а новое будет создано и помещено в поток.
Когда окно закрыто , это означает, что оно отправит полное уведомление . Это очень важный аспект.
Когда вы подписываетесь, будет создано окно немедленно и отправлено в поток.
Выполнив flatMap(value => value.pipe(toArray())))
, вы можете зарегистрировать наблюдателей. для текущего окна (темы). Семантически он такой же, как subject.pipe(toArray()).subscribe(subscriber)
Почему он так себя ведет?
timer(0, 100)
.pipe(
// Emit a value(window) every 1000ms and close(complete) the prev one
windowTime(1000),
take(3),
flatMap( value => value.pipe(toArray()))
)
Во-первых, давайте посмотрим на flatMap
. flatMap
совпадает с mergeMap
. mergeMap
управляет числом (concurrent
, по умолчанию INFINITY
) внутренних наблюдаемых. Внутренняя наблюдаемая отслеживается до тех пор, пока completes
.
Что toArray
делает для накопления значений до ее источника completes
. В этом случае до текущего subject
(окно) completes
. Это происходит, когда окно закрывается, а точнее, когда проходит 1000ms
.
Таким образом, как было указано ранее, окно будет создано сразу после подписки. После 0ms
приходит значение (0), после 100ms
другое значение (1) и т. Д., Пока не появится значение 9
. Между тем все эти значения были собраны toArray
. Таким образом, прибытие 9 также отмечает 1000ms
, то есть, когда текущее окно будет закрыто (будет выдано complete notification
). Когда это происходит, toArray
получит уведомление и отправит собранные значения потребителю данных.
После этого создается новое окно (второе значение для take(3)
). Затем приходит значение 10, затем значение 11 и так далее до 19, что означает еще один 1000ms
, что приведет к завершению текущего окна и созданию нового. Но это новое окно будет представлять 3-е значение для take(3)
. Это означает, что take
откажется от подписки на источник и отправит полное уведомление.
В результате источник не сможет получать другие значения, поэтому это должно объяснить, почему вы получаете только 2 массива.