Наблюдаемый с Stateful Lifecycle - PullRequest
0 голосов
/ 02 марта 2019

Общая проблема проектирования может быть описана следующим образом:

У меня есть соединение с веб-сокетом, для которого требуется соблюдение строгого жизненного цикла - оно хочет, чтобы connect и disconnect вызывались надлежащим образом, и, поскольку оно говоритв систему, он использует.В этом соединении с веб-сокетом у нас есть несколько различных объектов подписки, каждый из которых имеет строгий жизненный цикл, который он хочет соблюдать (subscribe и unsubscribe), и для успешной работы этих операций зависит состояние его родительского веб-сокета.

Вот график идеального поведения для трех вложенных наблюдаемых жизненного цикла, где C зависит от B, который зависит от A:


A = someInput.switchMap((i) => LifecycleObservable())
B = A.switchMap((a) => LifecycleObservable())
C = B.switchMap((b) => LifecycleObservable())

C.listen(print);

// <-- listen to c
// <-- produce [someInput]
setup A
setup B
setup C
// <-- c is produced

// <-- c is unsubscribed
teardown C
teardown B
teardown A

// <-- C is re-subscribed-to
setup A
setup B
setup C

// <-- produce [someInput]
teardown C
teardown B
teardown A
setup A
setup B
setup C
// <-- c is produced

Первый вопрос: Это анти-паттерн?Я не смог найти много об этом шаблоне в сети, но это похоже на довольно стандартную вещь, с которой вы столкнулись бы с наблюдаемыми: некоторые объекты просто имеют жизненный цикл, а некоторые объекты могут захотеть зависеть от этого.

Я могу довольно близко приблизиться к этому идеальному поведению, используя что-то вроде этого:

class LifecycleObservable {
  static Observable<T> fromObservable<T>({
    @required Observable<T> input,
    @required Future<void> Function(T) setup,
    @required Future<void> Function(T) teardown,
  }) {
    return input.asyncMap((T _input) async {
      await setup(_input);
      return _input;
    }).switchMap((T _input) {
      return Observable<T>(Observable.never()) //
          .startWith(_input)
          .doOnCancel(() async {
        await teardown(_input);
      });
    });
  }
}

Этот код принимает поток объектов с состоянием, запускает на них setup по мере их создания иteardown для них как суб-наблюдаемого в switchMap отменяется.

Проблема возникает, когда в исходной идеализированной временной шкале создается вторая [someInput]: используя код выше, я получаюcallgraph, такой как

// <-- listen to c
// <-- produce [someInput]
setup A
setup B
setup C
// <-- c is produced

// <-- produce [someInput]
teardown A
setup A
teardown B
setup B
teardown C
setup C
// <-- c is produced

, проблема в том, что если B зависит от A (как, например, вызов unsubscribe из подписки, которая зависит от открытого транспорта веб-сокетов), этот порядок разрыва нарушает ожидаемый жизненный цикл каждого объекта (подписка пытается отправить unsubscribe по закрытому транспорту веб-сокета.

1 Ответ

0 голосов
/ 06 марта 2019

Мне кажется, что просто, наблюдаемый паттерн не может выразить эту семантику .В частности, наблюдаемый шаблон не предназначен для каскадных зависимостей - родительские наблюдаемые ничего не знают о состоянии своих дочерних наблюдаемых.

Я решил эту проблему для себя с помощью следующего кода дротика.Я уверен, что это ужасно, но мне кажется, что в целом это работает ™.


class WithLifecycle<T> {
  final FutureOr<void> Function() setup;
  final FutureOr<void> Function() teardown;
  final T value;
  final WithLifecycle parent;

  List<WithLifecycle> _children = [];
  bool _disposed = false;
  WithLifecycle({
    @required this.value,
    this.setup,
    this.teardown,
    this.parent,
  });

  void addDependency(WithLifecycle child) => _children.add(child);
  void removeDependency(WithLifecycle child) => _children.remove(child);
  Future<void> init() async {
    parent?.addDependency(this);
    await setup();
  }

  Future<void> dispose() async {
    if (_disposed) {
      return;
    }

    _disposed = true;
    for (var _child in _children) {
      await _child.dispose();
    }
    _children.clear();
    await teardown();
  }
}

, который затем используется для создания необходимой цепочки зависимостей при использовании наблюдаемых:

class LifecycleObservable {
  static Observable<WithLifecycle<T>> fromObservable<T>({
    @required Observable<T> value,
    WithLifecycle parent,
    @required Future<void> Function(T) setup,
    @required Future<void> Function(T) teardown,
  }) {
    return value.concatMap((T _value) {
      final withLifecycle = WithLifecycle<T>(
        value: _value,
        parent: parent,
        setup: () => setup(_value),
        teardown: () => teardown(_value),
      );
      return Observable<WithLifecycle<T>>(Observable.never())
          .startWith(withLifecycle)
          .doOnListen(() async {
        await withLifecycle.init();
      }).doOnCancel(() async {
        await withLifecycle.dispose();
      });
    });
  }
}

который используется как

token$ = PublishSubject();
    channel$ = token$.switchMap((token) {
      return LifecycleObservable.fromObservable<IOWebSocketChannel>(
          value: Observable.just(IOWebSocketChannel.connect(Constants.connectionString)),
          setup: (channel) async {
            print("setup A ${channel.hashCode}");
          },
          teardown: (channel) async {
            print("teardown A ${channel.hashCode}");
            await channel.sink.close(status.goingAway);
          });
    });

    streams$ = channel$.switchMap((channel) {
      return LifecycleObservable.fromObservable<Stream<String>>(
        parent: channel,
        value: Observable.just(channel.value.stream.cast<String>()),
        setup: (thing) async {
          print("setup B ${thing.hashCode}");
        },
        teardown: (thing) async {
          print("teardown B ${thing.hashCode}");
        },
      );
    });

    messages = streams$.flatMap((i) => i.value).share();

и заканчивается графиком вызовов, как показано ниже

// <- push [token]
flutter: setup A 253354366
flutter: setup B 422603720
// <- push [token]
flutter: teardown B 422603720
flutter: teardown A 253354366
flutter: setup A 260164938
flutter: setup B 161253018
...