Если вы уже знакомы с Node.js потоками, я не думаю, что вам будет трудно использовать asp ту же концепцию в Rx Js: сборе данных, поступающем со временем .
Вариант использования этого сценария, в котором я открываю соединение и жду, пока оно установит sh перед обработкой любых входящих фрагментов.
Это можно перевести на Rx Js следующим образом:
interval(500)
.pipe(
take(5),
bufferTime(2000), // It takes 2s to establish the connection
)
.subscribe(console.log)
$
Суффикс используется для обозначения того, что src$
не простая переменная, это наблюдаемая (выдает значения с течением времени).
interval
будет выдавать значения на основе указанного интервала времени, а bufferTime
будет собирать испущенные значения до тех пор, пока не пройдет указанный ms
. Когда это произойдет, вы получите собранные значения в виде массива.
Что мне особенно нравится в Rx Js, так это то, что он поставляется с множеством встроенных операторов, которые позволяют вам манипулировать входящими data.
const src$ = from(['one', 'two', 'tree']);
src$
.pipe(
filter(v => v !== 'one'),
map(v => v.toUpperCase()),
toArray() // Group the values in an array once the source completes
)
.subscribe(val => console.log(val));
StackBlitz .
Чтобы преобразовать обещание в наблюдаемое, достаточно просто:
from(promise).subscribe()
Приведенный выше пример может показаться тривиальным, и я согласен. Вы неизбежно столкнетесь с более сложными ситуациями, и вам будет полезно узнать о наблюдаемых высших порядков .