Если вас волнует, сколько у вас подписчиков, вам, вероятно, не следует использовать трансляцию.Основная идея вещательного потока заключается в том, что он передает , не зная (или не заботясь) о том, кто слушает.Обратные вызовы onListen
и onCancel
изначально не были частью контроллера вещания, и они слегка нарушают модель.Они позволяют вам знать, когда никто не слушает, но это все.
В такой ситуации я бы создал свой собственный поток, который записывает прослушивания и отмены.
class _ListenStream<T> extends Stream<T> {
final Stream<T> _source;
final void Function() _onListen;
final void Function() _onCancel;
_ListenStream(this._source, this._onListen, this._onCancel);
bool get isBroadcastStream => _source.isBroadcastStream;
StreamSubscription<T> listen(void Function(T) onData, {
Function onError, void Function() onDone, bool cancelOnError = false}) {
if (_onListen != null) _onListen();
return _ListenSubscription<T>(_source.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError),
_onCancel);
}
}
class _ListenSubscription<T> extends StreamSubscription<T> {
final _StreamSubscription<T> _source;
final void Function() _onCancel;
void onData(void handleData(T data)) { _source.onData(handleData); }
void onError(Function handleError) { _source.onError(handleError); }
void onDone(void handleDone()) { _source.onDone(handleDone); }
void pause([Future resumeSignal]) { _source.pause(resumeSignal); }
void resume() { _source.resume(); }
Future<E> asFuture<E>([E defaultValue]) => _source.asFuture<E>(defaultValue);
bool get isPaused => _source.isPaused;
Future cancel() {
var future = _source.cancel();
if (_onCancel != null) future = future.whenComplete(_onCancel);
return future;
}
}
Затем, вместо предоставления исходного потока, вы предоставляете своим клиентам _ListenStream
обертку исходного потока вместе с обратными вызовами, которые вы хотите вызвать при прослушивании и отмене.