Делать что-нибудь, когда кто-то подписывается? - PullRequest
0 голосов
/ 28 января 2019

У меня есть широковещательный поток потока данных устройства BLE.

Чтобы вывести данные из этого потока, мне нужно отправить данные на устройство раньше.

Stream<Data> dataStream() {
  sendDataRequestToDevice();
  return _broadcastController.stream;
}

Проблема в том, что все асинхронно, это означает, что когда поток возвращается, очень вероятно, что фактическое событие, отправленное с устройства, уже прошло.Я ищу что-то вроде:

Stream<Data> dataStream() {
  return _broadcastController.stream
    .doOnSubscribe(() => sendDataRequestToDevice()); // stolen from rxjava ;)
}

Есть ли что-то подобное в стандартной библиотеке потоковой передачи без использования RxDart или чего-то подобного.(Я просто не хочу использовать это только для этой цели ...)

Ответы [ 3 ]

0 голосов
/ 28 января 2019

Для всех, кто ищет решение:

Я не считаю это хорошим решением, но оно делает свою работу.Пожалуйста, скажите мне, что есть лучшее решение!

Stream<Data> dataStream() {
  return StreamGroup.merge([
        doOnSubscribe(() => sendDataRequestToDevice()),
        _broadcastController.stream
      ]);
}

Stream doOnSubscribe(void onSubscribe()) {
  StreamController controller;
  controller = StreamController(
    onListen: () {
      onSubscribe();
      controller.close();
    },
  );
  return controller.stream;
}

К сожалению, оно не подходит для моего решения BLE из-за других проблем.

0 голосов
/ 29 января 2019

Если вас волнует, сколько у вас подписчиков, вам, вероятно, не следует использовать трансляцию.Основная идея вещательного потока заключается в том, что он передает , не зная (или не заботясь) о том, кто слушает.Обратные вызовы onListen и onCancel изначально не были частью контроллера вещания, и они слегка нарушают модель.Они позволяют вам знать, когда никто не слушает, но это все.

В такой ситуации я бы создал свой собственный поток, который записывает прослушивания и отмены.

class _ListenStream<T> extends Stream<T> {
  final Stream<T> _source;
  final void Function() _onListen; 
  final void Function() _onCancel;
  _ListenStream(this._source, this._onListen, this._onCancel);
  bool get isBroadcastStream => _source.isBroadcastStream;
  StreamSubscription<T> listen(void Function(T) onData, {
    Function onError, void Function() onDone, bool cancelOnError = false}) {
    if (_onListen != null) _onListen();
    return _ListenSubscription<T>(_source.listen(onData, 
        onError: onError, onDone: onDone, cancelOnError: cancelOnError),
        _onCancel);  
  }
}
class _ListenSubscription<T> extends StreamSubscription<T> {
  final _StreamSubscription<T> _source;
  final void Function() _onCancel;

  void onData(void handleData(T data)) { _source.onData(handleData); }
  void onError(Function handleError) { _source.onError(handleError); }
  void onDone(void handleDone()) { _source.onDone(handleDone); }

  void pause([Future resumeSignal]) { _source.pause(resumeSignal); }
  void resume() { _source.resume(); }
  Future<E> asFuture<E>([E defaultValue]) => _source.asFuture<E>(defaultValue);
  bool get isPaused => _source.isPaused;

  Future cancel() {
    var future = _source.cancel();
    if (_onCancel != null) future = future.whenComplete(_onCancel);
    return future;
  }
}

Затем, вместо предоставления исходного потока, вы предоставляете своим клиентам _ListenStream обертку исходного потока вместе с обратными вызовами, которые вы хотите вызвать при прослушивании и отмене.

0 голосов
/ 28 января 2019

Кажется, вы ищете onListen

https://api.dartlang.org/stable/2.1.0/dart-async/StreamController/onListen.html

_broadcastController.onListen = () {
  _broadcastController.add('foo');
};

Вы также можете передать обработчик в конструктор https://api.dartlang.org/stable/2.1.0/dart-async/StreamController/StreamController.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...