RX: Обработка ошибок в combList - PullRequest
0 голосов
/ 26 декабря 2018

Я использую combinedLatest2 в RxDart, но я все еще смущен этим.Это мой код:

final validator = StreamTransformer<String, String>.fromHandlers(
    handleData: (data, sink) =>
        data.isNotEmpty ? sink.add(data) : sink.addError('Cannot be empty.'));

final _subject1 = BehaviorSubject<String>();
final stream1 = _subject1.stream.transform(validator);
final changeSubject1 = _subject1.sink.add;

final _subject2 = BehaviorSubject<String>();
final stream2 = _subject2.stream.transform(validator);
final changeSubject2 = _subject2.sink.add;

final combined =
    Observable.combineLatest2(stream1, stream2, (a, b) => '$a, $b');

У меня есть две темы, из которых я получил ссылки на их потоки и sink.add функции.Перед тем, как назначить потоки их соответствующим переменным, я добавил в преобразователь, который обеспечивает отправку непустой строки, в противном случае в приемник будет добавлена ​​ошибка.Наконец, я создал другой поток, combined, скомбинировав первые два, используя Observable.combineLatest2.

Поток combined будет излучать только тогда, когда его "дочерние" потоки имеют допустимые значения.Проблема, с которой я здесь сталкиваюсь, возникает, когда два потока уже отправили допустимые значения, затем оба отправили недопустимые значения, а затем один из них выдал допустимое значение.Интересно, что для последнего выброса поток combined также испускает новое значение вновь обновленного потока и предыдущее допустимое значение другого (хотя недопустимое значение уже было отправлено после предыдущего действительного).Могу ли я предотвратить это?Другими словами, выполнение этого кода:

combined.listen((data) => print(data), onError: (error) => print('error'));

changeSubject1('Hello');
changeSubject2('World');
changeSubject1('');
changeSubject2('');
changeSubject1('NewWorld');

сгенерирует этот вывод:

Hello, World
error
error
NewHello, World
NewHello, NewWorld

Вывод, который я пытаюсь достичь:

Hello, World
error
error
NewHello, NewWorld

Таким образом, я пытаюсь заставить поток combined излучать только тогда, когда допустимы последние значения каждого потока.

1 Ответ

0 голосов
/ 29 декабря 2018

Я смог получить то, что хотел, создав «1001 * вариационную функцию с учетом ошибок».Оригинальная функция combineList работает путем создания нового Observable с использованием потока CombineLatestStream.Я создал новый класс потока, а именно ErrorAwareCombineLatestStream, который имеет почти ту же реализацию, что и CombineLatestStream.Я добавил только пару строк, которые сохраняют состояние ошибки каждого потока и выдают только тогда, когда все ошибки устранены.

Вот моя реализация:

Класс ErrorAwareCombineLatestStream:

class ErrorAwareCombineLatestStream<T, A, B, C, D, E, F, G, H, I>
    extends Stream<T> {
  final StreamController<T> controller;

  ErrorAwareCombineLatestStream(Iterable<Stream<dynamic>> streams,
      T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]))
      : controller = _buildController(streams, combiner);

  @override
  StreamSubscription<T> listen(void Function(T event) onData,
      {Function onError, void Function() onDone, bool cancelOnError}) {
    return controller.stream.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }

  static StreamController<T> _buildController<T, A, B, C, D, E, F, G, H, I>(
      Iterable<Stream<dynamic>> streams,
      T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i])) {
    final List<StreamSubscription<dynamic>> subscriptions =
        new List<StreamSubscription<dynamic>>(streams.length);
    StreamController<T> controller;

    controller = new StreamController<T>(
        sync: true,
        onListen: () {
          final List<dynamic> values = new List<dynamic>(streams.length);
          final List<bool> triggered =
              new List<bool>.generate(streams.length, (_) => false);
          final List<bool> completedStatus =
              new List<bool>.generate(streams.length, (_) => false);
          final List<bool> hasError =
              new List<bool>.generate(streams.length, (_) => false);

          for (int i = 0, len = streams.length; i < len; i++) {
            Stream<dynamic> stream = streams.elementAt(i);
            subscriptions[i] = stream.listen((dynamic value) {
              values[i] = value;
              triggered[i] = true;
              hasError[i] = false;

              final allStreamsHaveEvents =
                  triggered.reduce((bool a, bool b) => a && b) &&
                      !hasError.reduce((a, b) => a || b);

              if (allStreamsHaveEvents)
                updateWithValues(combiner, values, controller);
            }, onError: (e) {
              hasError[i] = true;
              controller.addError(e);
            }, onDone: () {
              completedStatus[i] = true;

              if (completedStatus.reduce((bool a, bool b) => a && b))
                controller.close();
            });
          }
        },
        onCancel: () => Future.wait<dynamic>(subscriptions
            .map((StreamSubscription<dynamic> subscription) =>
                subscription.cancel())
            .where((Future<dynamic> cancelFuture) => cancelFuture != null)));

    return controller;
  }

  static void updateWithValues<T, A, B, C, D, E, F, G, H, I>(
      T combiner(A a, B b, [C c, D d, E e, F f, G g, H h, I i]),
      Iterable<dynamic> values,
      StreamController<T> controller) {
    try {
      final int len = values.length;
      final A a = values.elementAt(0);
      final B b = values.elementAt(1);
      T result;

      switch (len) {
        case 2:
          result = combiner(a, b);
          break;
        case 3:
          final C c = values.elementAt(2);

          result = combiner(a, b, c);
          break;
        case 4:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);

          result = combiner(a, b, c, d);
          break;
        case 5:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);

          result = combiner(a, b, c, d, e);
          break;
        case 6:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);
          final F f = values.elementAt(5);

          result = combiner(a, b, c, d, e, f);
          break;
        case 7:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);
          final F f = values.elementAt(5);
          final G g = values.elementAt(6);

          result = combiner(a, b, c, d, e, f, g);
          break;
        case 8:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);
          final F f = values.elementAt(5);
          final G g = values.elementAt(6);
          final H h = values.elementAt(7);

          result = combiner(a, b, c, d, e, f, g, h);
          break;
        case 9:
          final C c = values.elementAt(2);
          final D d = values.elementAt(3);
          final E e = values.elementAt(4);
          final F f = values.elementAt(5);
          final G g = values.elementAt(6);
          final H h = values.elementAt(7);
          final I i = values.elementAt(8);

          result = combiner(a, b, c, d, e, f, g, h, i);
          break;
      }

      controller.add(result);
    } catch (e, s) {
      controller.addError(e, s);
    }
  }
}

Функция errorAwareCombineLatest2:

Observable<T> errorAwareCombineLatest2<A, B, T>(
        Stream<A> streamOne, Stream<B> streamTwo, T combiner(A a, B b)) =>
    new Observable<T>(new ErrorAwareCombineLatestStream<T, A, B, Null, Null,
            Null, Null, Null, Null, Null>(
        <Stream<dynamic>>[streamOne, streamTwo],
        (A a, B b, [Null c, Null d, Null e, Null f, Null g, Null h, Null i]) =>
            combiner(a, b)));
...