Идиоматический способ сделать это в RxJava - использовать hot observable .
Cold observables выполнять некоторые действия, когда кто-то подписывается на них и испускает все элементыдля этого подписчика.Таким образом, это отношение 1 к 1.
Горячая наблюдаемая выполняет некоторые действия и генерирует элементы независимо от индивидуальной подписки.Поэтому, если вы подписываетесь слишком поздно, вы можете не получить некоторые значения, которые были отправлены ранее.Это отношение 1 ко многим, то есть многоадресная рассылка - это то, что вам нужно.
Обычный способ сделать это - Flowable.publish()
, который делает Flowable
многоадресную рассылку, но требует вызова connect()
метод для начала выдачи значений.
В вашем случае вы также можете вызвать refCount()
, который добавляет желаемую функциональность - он подписывается на источник Flowable, когда есть хотя быодна подписка и отписывается, когда все отписались.
Поскольку publish().refCount()
является довольно популярной комбинацией, для них есть ярлык - share()
.И, насколько я понимаю, это именно то, что вам нужно.
Редактировать asker: Этот код включает в себя этот ответ и комментарий Дэвида Карнока в форме метода провайдера Dagger 2.SimpleMatrix
от EJML .Кажется, это делает то, о чем я просил.
@Provides
@Singleton
@Named(MAGNETOMETER)
public Observable<SimpleMatrix> magnetometer(final SensorManager sensorManager) {
final PublishSubject<SimpleMatrix> ps = PublishSubject.create();
final Sensor sensor = sensorManager.getDefaultSensor(TYPE_MAGNETIC_FIELD);
final SensorEventListener listener = new SensorEventAdapter() {
@Override
public void onSensorChanged(final SensorEvent event) {
ps.onNext(new SimpleMatrix(1, 3, true, event.values));
}
};
return ps.doOnSubscribe(s -> {
sensorManager.registerListener(listener, sensor, SENSOR_DELAY_NORMAL);
}).doOnDispose(() -> {
sensorManager.unregisterListener(listener);
}).share();
}