Как вложить потоки в Dart (сопоставить потоки событиям Stream)? - PullRequest
0 голосов
/ 03 мая 2020

Подобно этому вопросу флаттера Я хочу гнездо Stream с.
В флаттере это может быть легко достигнуто путем вложения StreamBuilder с, однако я не хочу использовать виджеты. Вместо этого я хочу решить проблему только в Dart . (вложение здесь означает, что один поток зависит от значений из другого потока, и их следует объединить)

Позвольте мне проиллюстрировать проблему:

Stream streamB(String a);

streamA: 'Hi' --- 'Hello' ---- 'Hey'

Как видите, у меня есть streamA, который непрерывно излучает события, и streamB, который возникает из событий, которые streamA излучает. В streamC я хочу получать информацию о каждом событии с streamB.

Обычное отображение потока

Если бы у меня было valueB вместо streamB, Я мог бы просто использовать streamA.map((event) => valueB(event)), однако Stream.map может обрабатывать только синхронные значения.
Существует также Stream.asyncMap, однако, это работает только для Future с.
Затем есть также Stream.expand, но это работает только для синхронных итераций.

1 Ответ

0 голосов
/ 03 мая 2020

Stream.asyncExpand

На самом деле существует метод Stream.asyncExpand :

streamC = streamA.asyncExpand((event) => streamB(event));

Однако проблема заключается в том, что Поток результатов (streamC) перейдет к следующему событию в исходном потоке (streamA), только если подпоток (streamB) первого события закрыт. В случае, скажем, Cloud Firestore, это никогда не будет работать, потому что подпоток не закроется.

Stream.concurrentAsyncExpand

К счастью, есть stream_transform пакет !

streamC = streamA.concurrentAsyncExpand((event) => streamB(event));

Этот пакет обеспечивает одновременное расширение asyn c. Таким образом, результирующий поток не ожидает закрытия подпотоков.

Однако есть и обратная сторона: предыдущие подпотоки не закрываются автоматически при получении нового события в исходном потоке.
Таким образом, это также не полезно для Cloud Firestore.

Stream.sequentialAsyncExpand

Официально в пакете stream_transform пока нет, однако вы можете используйте мой PR .

streamC = streamA.sequentialAsyncExpand((event) => streamB(event));

Это решает проблему, которую я изложил выше.

...