CLI Dart: onPause, onResume, onDone не запускается, как ожидалось - PullRequest
0 голосов
/ 16 ноября 2018

Я экспериментирую с Dart и не могу объяснить два наблюдения.

  1. Интересно, почему не запускается обработчик "onDone", назначенный потоковой подписке.
  2. Интересно, почему обработчики "onPause" и "onResume" запускаются только один раз.

Код:

import 'dart:async';
import 'dart:io';

/// This class encapsulates all the necessary data used by the "onValue" event
/// handler (the construct avoids using global variables).
class OnValueHandlerContainer {
  static StreamSubscription<int> _streamSubscriber;

  static setStreamSubscriber(StreamSubscription<int> stream) {
    _streamSubscriber = stream;
  }

  // This static method is the handler executed when a event is received through
  // the stream.
  //
  // WARNING: you have absolutely no idea when this handler will be executed.
  // Do not assume that it will be executed right after the execution of the code
  // that emits an event. It may be executed several lines (of codes) below the
  // line that emits the event. It may well be executed after the end of the
  // script.
  static void onValue(int value) {
    // At this point: the state of the subscription is (inevitably) "active".
    print("onValue: An event has been raised. The associated value is ${value}!");
    print("         Pause the subscription. Wait for 1 second. Resume the subscription");

    // Note 1: once a Dart function starts executing, it continues executing until
    //         it exits. When managing interrupts in C, it is necessary to protect
    //         interrupt handlers from being interrupted. This is not the case in
    //         Dart : a function (and, thus, an event handler) cannot be interrupted
    //         by the occurrence of another event.
    //         => The code below has no sense, other than experimentation.
    // Note 2: while paused, the subscription will not fire any events. If it receives
    //         events from its source, they will be buffered until the subscription
    //         is resumed.
    _streamSubscriber.pause();
    sleep(Duration(seconds: 1));
    _streamSubscriber.resume();

    // At this point: the state of the subscription is "active".
  }
}

main() async {

  // Create a controller.
  // A StreamController gives you a new stream and a way to add events to the stream
  // at any point, and from anywhere. The stream has all the logic necessary to handle
  // listeners and pausing. You return the stream and keep the controller to yourself.
  StreamController<int> sc = StreamController<int>(
      onListen: () => print("Controller: the stream has been assigned a listener!"),
      onCancel: () => print("Controller: the stream has been canceled!"),
      // As you may notice, the event handlers are not executed every time the
      // subscription gets paused or resumed.
      //
      // This behaviour comes from these facts:
      // - Dart is single-threaded.
      // - An event handler cannot be interrupted: once a Dart function starts
      //   executing, it continues executing until it exits. In other words, Dart
      //   functions can’t be interrupted by other Dart code.
      //   See https://webdev.dartlang.org/articles/performance/event-loop
      // - A stream is a FIFO.
      onPause:  () => print("Controller: the stream has been paused!"),
      onResume: () => print("Controller: the stream has been resumed!")
  );

  // Get the stream created by the stream controller.
  // Right now, this stream has no assigned listener.
  Stream<int> stream = sc.stream;
  print("Does the stream provided by the controller have a listener ? ${sc.hasListener ? 'yes' : 'no'} - the answer should be no.");

  // Push values into the stream controlled by the stream controller.
  // Because no listener subscribed to the stream, these values are just stored
  // into the stream.
  for(int i=0; i<3; i++) {
    print("Send the value ${i} into the stream.");
    sc.add(i);
  }

  // Add a listener to the stream.
  // Now the stream has an assigned listener.
  StreamSubscription<int> subscriber = stream.listen(OnValueHandlerContainer.onValue);
  OnValueHandlerContainer.setStreamSubscriber(subscriber);
  subscriber.onDone(() => print("The subscription is done!"));
  print("Does the stream provided by the controller have a listener ? ${sc.hasListener ? 'yes' : 'no'} - the answer should be yes.");

  // Wait for 10 seconds.
  print("Start waiting for 10 seconds");
  Future.delayed(Duration(seconds: 10)).then((var v) => print("10 seconds ellapsed!"));
  print("End of script");
}

Результат:

Does the stream provided by the controller have a listener ? no - the answer should be no.
Send the value 0 into the stream.
Send the value 1 into the stream.
Send the value 2 into the stream.
Controller: the stream has been assigned a listener!
Does the stream provided by the controller have a listener ? yes - the answer should be yes.
Start waiting for 10 seconds
End of script
onValue: An event has been raised. The associated value is 0!
         Pause the subscription. Wait for 1 second. Resume the subscription
Controller: the stream has been paused!
onValue: An event has been raised. The associated value is 1!
         Pause the subscription. Wait for 1 second. Resume the subscription
onValue: An event has been raised. The associated value is 2!
         Pause the subscription. Wait for 1 second. Resume the subscription
Controller: the stream has been resumed!
10 seconds ellapsed!

В основном предоставленный код выполняет следующие действия:

  • Создан контроллер потока.
  • 3 события вводятся в поток, предоставленный контроллером.
  • Прослушиватель подписывается на поток, предоставленный контроллером.
  • Мы назначаем обработчик "onDone" для подписчика прослушивателя.
  • Внутри прослушивателя потока (OnValueHandlerContainer::onValue) мы приостанавливаем и возобновляем подписку.

Потоковый приемник запускается 3 раза, как и ожидалось.

Однако:

  • the "onDone"обработчик никогда не выполняется.Я ожидаю, что он будет выполнен в конце выполнения скрипта, пока контроллер уничтожается (и, следовательно, подписка закрывается).
  • обработчики "onPause" и "onResume"запустить только один раз.Я ожидаю, что они будут выполнены 3 раза.

Есть идеи?

1 Ответ

0 голосов
/ 16 ноября 2018

Причина, по которой вы не получаете событие "done", заключается в том, что вы никогда не close подписываете поток.

Причина, по которой вы не получаете больше событий "паузы", заключается в том, что потоковая подписка умная .

Первое, что вы делаете, это добавляете много событий, прежде чем кто-либо даже послушает поток. Вы никогда не должны делать это в реальном коде, вместо этого начинайте добавлять события только при вызове onListen и останавливайтесь снова при вызове onPause, пока подписка не будет возобновлена.

Здесь потоковая подписка заполняется рядом событий, затем она доставляет одно событие, а затем подписка приостанавливается. Подписка покорно сообщает об этом обратно контроллеру. Затем подписка получает резюме. Вот где это становится умным. Поскольку у него уже есть события для доставки, он не сообщает о возобновлении обратно контроллеру. На самом деле он не хочет больше событий прямо сейчас, есть что доставить. И поэтому он доставляет буферизованные события с интервалом в одну секунду, пока буфер не станет пустым. В этой точке он сообщает о возобновлении обратно контроллеру.

Контроллер сообщает, что работа возобновлена, но поскольку никто не добавляет больше событий, и никто не вызывает close, больше ничего не произойдет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...