Объединение потоков фьючерсов в дартс - PullRequest
1 голос
/ 23 января 2020

Объединив несколько потоков с преобразователями потока и асинхронной c функцией, которая возвращает будущее, я получаю поток будущего.

Я могу использовать что-то вроде выравнивания потока будущего:

Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
  await for (var future in source) yield* Stream.fromFuture(future);
}

Мне кажется, это немного излишне, чтобы преобразовать Будущее в поток, но я не нашел ни одного элегантного решения. Конечно, я мог бы передать его другому наблюдателю со слушателем, но я подозреваю, что просто упустил более простой подход?

Ответы [ 2 ]

1 голос
/ 23 января 2020

Прежде всего, поток фьючерсов опасен . Я настоятельно рекомендую избегать этого, если это абсолютно возможно.

Для вашего текущего примера, , если в вашем фьючерсе нет ошибок, вы можете напрямую ожидать будущего вместо преобразования его в поток:

Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
  await for (var future in source) yield await future;
}

Это в значительной степени самый идиоматический c способ ожидания будущего потока, затем ожидания каждого будущего и, наконец, создания потока будущих результатов. Тем не менее, он сломается при ошибке first . Если в вашем будущем нет ошибок, это нормально, и вы можете перестать читать здесь.

Если вы действительно хотите пересылать все ошибки из фьючерса в результирующий поток, тогда ваш исходный код будет идеальным! Единственный способ для функции async* выдать ошибку в потоке, не закрывая поток, - это yield* поток, содержащий ошибку.

Альтернативная версия, оптимизированная для удобства чтения, а не краткости, может быть:

Stream<T> flattenStreamsOfFutures<T>(Stream<Future<T>> source) async* {
  await for (var future in source) {
    try {
      yield await future;
    } catch (error, stack) {
      yield* Stream.error(error, stack);
    }
  }
}

, но результат такой же, как при использовании Stream.fromFuture.

Опасность, о которой я говорил здесь, заключается в том, что пока вы ожидаете этого будущего (или yield* в этом потоке), вы приостанавливаете поток, который слушаете. Это само по себе обычно не является проблемой, поскольку потоки создаются для приостановки. Если поступает больше данных, они просто буферизируются. Однако, когда данные представляют собой будущее, и это будущее может завершиться с ошибкой, задержка доставки означает, что у вас может не быть времени для добавления обработчика ошибок в будущее, прежде чем оно завершится с ошибкой. Это сделает ошибку необработанной ошибкой , которая может взломать sh вашего приложения. Итак, это плохо.

Итак, если у вас могут быть фьючерсы на ошибки в вашем потоке, вам нужно как можно быстрее добраться до этих . Даже ожидание потока, чтобы доставить будущее, может быть слишком поздно, поэтому у вас вообще не должно быть Stream<Future>. Чтобы избежать проблем, вы должны быть чрезвычайно умны (вы должны доставлять фьючерсы синхронно, и вы должны немедленно реагировать на паузы и отмены, и вы не должны вводить другие задержки между созданием будущего и его доставкой.)

Я рекомендую переписать код, который создает Stream<Future<T>>, чтобы просто создать Stream<T>. Я не могу сказать, как это сделать, не видя исходного кода, но, вероятно, вы сразу дождетесь будущего, где он будет создан с использованием функции, которая возвращает фьючерсы, а затем отправите значение или ошибку в поток в качестве события потока, а точнее чем отправка самого будущего.

0 голосов
/ 23 января 2020

Ваш подход заключается в том, что возвращаемый поток не будет возвращать ничего, пока не завершится все фьючерсы.

Существует более идиоматический c способ создания потока из серии фьючерсов, хотя : Stream.fromFutures.

Stream.fromFutures([
  Future.delayed(Duration(seconds: 3), () => 'a'),
  Future.delayed(Duration(seconds: 2), () => 'b'),
  Future.delayed(Duration(milliseconds: 1500), () => 'c'),
]).listen(print);

Если входными данными должен быть поток фьючерсов, а не список, в vanilla Dart нет простого способа сделать это. Этот подход должен, по крайней мере, заставить его вести себя как ожидалось.

Stream<T> flattenFutures<T>(Stream<Future<T>> source) {
  final controller = StreamController<T>();
  source.listen((future) => future.then((value) => controller.add(value)));
  return controller.stream;
}
...