Преобразовать поток <Stream <T>> в поток <T> - PullRequest
0 голосов
/ 26 июня 2019

Я использую пакет rxdart для обработки потока в дротике. Я застрял в решении специфической проблемы.

Пожалуйста, посмотрите на этот фиктивный код:

final userId = BehaviorSubject<String>();

Stream<T> getStream(String uid) {
  // a sample code that returns a stream
  return BehaviorSubject<T>().stream;
}

final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));

Теперь я хочу преобразовать переменную oops, чтобы получить только Observable<T>.

Мне трудно ясно объяснить. Но позвольте мне попробовать. У меня есть поток A. Я отображаю каждый вывод потока A в другой поток B. Теперь у меня есть Stream<Stream<B>> - своего рода рекуррентный поток. Я просто хочу выслушать последние значения, полученные с помощью этого шаблона. Как мне этого добиться?

Ответы [ 3 ]

1 голос
/ 26 июня 2019

Редко иметь Stream<Stream<Something>>, поэтому нет особой поддержки.

Одной из причин является то, что существует несколько (по крайней мере, два) способа объединить поток потоков вещей в поток вещей.

  1. Либо вы слушаете каждый поток по очереди, ожидая его завершения, прежде чем начинать следующий, а затем отправляете события по порядку.

  2. Или вы слушаете каждый новый поток в тот момент, когда он становится доступным, а затем отправляете события из любого потока как можно скорее.

Первый легко написать, используя async / await:

Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

Последнее более сложное, поскольку требует одновременного прослушивания нескольких потоков и объединения их событий. (Если бы только StreamController.addStream допускало более одного потока одновременно, это было бы намного проще). Вы можете использовать StreamGroup класс из package:async для этого:

import "package:async/async" show StreamGroup;

Stream<T> mergeStreams<T>(Stream<Stream<T>> source) {
  var sg = StreamGroup<T>();
  source.forEach(sg.add).whenComplete(sg.close);
  // This doesn't handle errors in [source].
  // Maybe insert 
  //   .catchError((e, s) {
  //      sg.add(Future<T>.error(e, s).asStream())
  // before `.whenComplete` if you worry about errors in [source].          
  return sg.stream;
}
0 голосов
/ 27 июня 2019

После того, как я исследовал несколько методов, чтобы объединить Stream<Stream<T>> в один Stream<T>, я обнаружил, что есть несколько способов сделать это. Здесь я перечислю их:

1. Используя чистый дротик

Как ответил @ Ирн , это чистое решение для дротиков:

Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Stream<int> s = flattenStreams(Stream.fromIterable(list).map(getStream));
    s.listen(print);
}

Выходы: 1 2 3 4 1 2 3 4 1 2 3 4

2. Использование Observable.flatMap

Observable имеет метод flatMap, который выравнивает выходной поток и присоединяет его к текущему потоку:

import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.fromIterable(list).flatMap(getStream);
    s.listen(print);
}

Выходы: 1 2 3 4 1 2 3 4 1 2 3 4

3. Использование Observable.switchLatest

Преобразование потока, который испускает потоки (иначе говоря, «поток более высокого порядка»), в один наблюдаемый объект, который испускает элементы, испускаемые последним из этих потоков.

Это решение, которое я искал! Мне просто нужен последний вывод, испускаемый внутренним потоком.

import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.switchLatest(
            Observable.fromIterable(list).map(getStream));
    s.listen(print);
}

Выходы: 1 1 1 2 3 4

0 голосов
/ 26 июня 2019

Если вы хотите, чтобы Stream> возвращал Stream, вам в основном нужно сгладить поток. Функция flatmap - это то, что вы использовали бы здесь

public static void main(String args[]) {
    StreamTest st = new StreamTest<String>();
    List<String> l = Arrays.asList("a","b", "c");
    Stream s = l.stream().map(i -> st.getStream(i)).flatMap(i->i);
}

Stream<T> static getStream(T uid) {
    // a sample code that returns a stream
    return Stream.of(uid);
}

Если вам нужен первый объект, используйте метод findFirst ()

public static void main(String args[]) {
    StreamTest st = new StreamTest<String>();
    List<String> l = Arrays.asList("a","b", "c");
    String str = l.stream().map(i -> st.getStream(i)).flatMap(i->i).findFirst().get();
}
...