Как создать наблюдаемую многоадресную рассылку, которая активируется при подписке? - PullRequest
0 голосов
/ 24 июня 2018

Я хочу объединить входы нескольких датчиков Android и выставить выход как наблюдаемое (или, по крайней мере, то, на что можно подписаться), которое поддерживает несколько одновременных наблюдателей.Какой идиоматический способ подойти к этому?Есть ли в стандартной библиотеке класс, который бы послужил хорошей отправной точкой?

Я думал об обёртывании PublishSubject в объекте с делегатами для одного или нескольких subscribe методов, которые проверяют hasObservers наактивируйте датчики и оберните возвращенный Disposable в прокси, который проверяет hasObservers, чтобы отключить их.Примерно так, хотя уже есть некоторые очевидные проблемы:

public class SensorSubject<T> {
    private final PublishSubject<T> mSubject = PublishSubject.create();

    public Disposable subscribe(final Consumer<? super T> consumer) {
        final Disposable d = mSubject.subscribe(consumer);
        if(mSubject.hasObservers()) {
            // activate sensors
        }
        return new Disposable() {
            @Override
            public void dispose() {
                // possible race conditions!
                if(!isDisposed()) {
                    d.dispose();
                    if(!mSubject.hasObservers()) {
                        // deactivate sensors
                    }
                }
            }

            @Override
            public boolean isDisposed() {
                return d.isDisposed();
            }
        };
    }
}

1 Ответ

0 голосов
/ 25 июня 2018

Идиоматический способ сделать это в RxJava - использовать hot observable .

Cold observables выполнять некоторые действия, когда кто-то подписывается на них и испускает все элементыдля этого подписчика.Таким образом, это отношение 1 к 1.

Горячая наблюдаемая выполняет некоторые действия и генерирует элементы независимо от индивидуальной подписки.Поэтому, если вы подписываетесь слишком поздно, вы можете не получить некоторые значения, которые были отправлены ранее.Это отношение 1 ко многим, то есть многоадресная рассылка - это то, что вам нужно.

Обычный способ сделать это - Flowable.publish(), который делает Flowable многоадресную рассылку, но требует вызова connect() метод для начала выдачи значений.

В вашем случае вы также можете вызвать refCount(), который добавляет желаемую функциональность - он подписывается на источник Flowable, когда есть хотя быодна подписка и отписывается, когда все отписались.

Поскольку publish().refCount() является довольно популярной комбинацией, для них есть ярлык - share().И, насколько я понимаю, это именно то, что вам нужно.

Редактировать asker: Этот код включает в себя этот ответ и комментарий Дэвида Карнока в форме метода провайдера Dagger 2.SimpleMatrix от EJML .Кажется, это делает то, о чем я просил.

@Provides
@Singleton
@Named(MAGNETOMETER)
public Observable<SimpleMatrix> magnetometer(final SensorManager sensorManager) {
    final PublishSubject<SimpleMatrix> ps = PublishSubject.create();
    final Sensor sensor = sensorManager.getDefaultSensor(TYPE_MAGNETIC_FIELD);
    final SensorEventListener listener = new SensorEventAdapter() {
        @Override
        public void onSensorChanged(final SensorEvent event) {
            ps.onNext(new SimpleMatrix(1, 3, true, event.values));
        }
    };
    return ps.doOnSubscribe(s -> {
        sensorManager.registerListener(listener, sensor, SENSOR_DELAY_NORMAL);
    }).doOnDispose(() -> {
        sensorManager.unregisterListener(listener);
    }).share();
}
...