BehaviorSubject подписаться на другой поток - PullRequest
0 голосов
/ 06 января 2019

Я новичок в RxJava и решил использовать его, потому что думал, что он хорошо подойдет для моего варианта использования.

У меня есть Integer значения, которые я хочу наблюдать в течение бесконечного периода времени. Всякий раз, когда изменяется одно из этих значений (то есть событие), я хочу, чтобы все его наблюдатели вызывались в другом потоке .

Из-за большого количества времени наблюдения я думал, что мне нужно использовать класс BehaviorSubject (хотя изначально я думал, что Observable - это все, что мне нужно ... видя, что мне просто нужно "наблюдать"), и я мог используйте метод subscribeOn(), чтобы установить планировщик и, следовательно, добиться вызова подписчиков в фоновом потоке:

private BehaviorSubject<Integer> rotationPositionSubject = BehaviorSubject.createDefault(getRotorPosition());
rotationPositionSubject.subscribeOn(scheduler);

И у меня есть метод rotate(), который я использовал для обновления rotationPositionSubject, который будет вызываться из основного потока:

@Override
public synchronized int rotate()
{
    final int newRotorPosition = super.rotate();
    rotationPositionSubject.onNext(newRotorPosition);

    return newRotorPosition;
}

Однако с помощью приведенного выше кода я обнаружил, что подписчики вызываются в «основном» потоке. Изучение документов для subscribeOn () :

Возвращает:

источник ObservableSource, модифицированный так, чтобы его подписки происходили на указанном планировщике

Таким образом, мой приведенный выше код не будет работать, поскольку я не использую возвращенный ObservableSource, но возвращаемый объект - Observable, который бесполезен для моего приложения?

Тогда возникает вопрос, как мне наблюдать за любым объектом и вызывать подписчиков в фоновом потоке с RxJava, или RxJava - неправильный выбор?

1 Ответ

0 голосов
/ 06 января 2019

После некоторых экспериментов кажется, что при использовании BehaviorSubject объектов нужно немного позаботиться, и их использование не так очевидно, как я понял из названий различных интерфейсов.

В качестве метода тестирования, чтобы продемонстрировать, что я сейчас делаю:

@Test
public void test()
{
    System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
    final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
    rotorBehaviour.subscribeOn(Schedulers.single());
    rotorBehaviour.subscribe(new Observer<Integer>()
    {
        @Override
        public void onSubscribe(final Disposable d)
        {
            System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onNext(final Integer integer)
        {
            System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onError(final Throwable e)
        {
            System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onComplete()
        {
            System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
        }
    });


    rotorBehaviour.onNext(1);
    rotorBehaviour.onNext(2);

}

Это приводит к нежелательному результату:

Выполнение теста с идентификатором потока: 1
onSubscribe () вызывается для идентификатора потока: 1
onNext () вызывается для идентификатора потока: 1
onNext () вызывается для идентификатора потока: 1

Процесс завершен с кодом выхода 0

(нежелательно, потому что onNext() вызывается в главном потоке)

Изменение кода для использования Observable, возвращенного из вызова на subscribeOn, дает тот же нежелательный результат:

@Test
public void test()
{
    System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
    final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
    Observable<Integer> rotorObservable = rotorBehaviour.subscribeOn(Schedulers.single());

    rotorObservable.subscribe(new Observer<Integer>()
    {
        @Override
        public void onSubscribe(final Disposable d)
        {
            System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onNext(final Integer integer)
        {
            System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onError(final Throwable e)
        {
            System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onComplete()
        {
            System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
        }
    });
    rotorBehaviour.onNext(1);
    rotorBehaviour.onNext(2);
}

Результат:

Выполнение теста с идентификатором потока: 1
onSubscribe () вызывается для идентификатора потока: 1
onNext () вызывается для идентификатора потока: 1
onNext () вызывается для идентификатора потока: 1

Процесс завершен с кодом выхода 0

Однако использование метода observeOn() дает желаемый результат:

@Test
public void test()
{
    System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
    final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
    Observable<Integer>rotorObservable = rotorBehaviour.observeOn(Schedulers.single());

    rotorObservable.subscribe(new Observer<Integer>()
    {
        @Override
        public void onSubscribe(final Disposable d)
        {
            System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onNext(final Integer integer)
        {
            System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onError(final Throwable e)
        {
            System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onComplete()
        {
            System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
        }
    });
    rotorBehaviour.onNext(1);
    rotorBehaviour.onNext(2);
}

Выполнение теста с идентификатором потока: 1
onSubscribe () вызывается для ID потока: 1
onNext () вызывается для идентификатора потока: 13
onNext () вызывается для идентификатора потока: 13

Процесс завершен с кодом выхода 0

Также во всех примерах я все еще использую объект BehaviorSubject, чтобы инициировать событие, и только случайно обнаружил, что это даст желаемый результат.

Что меня беспокоит, так это то, что я могу использовать Observable и BehaviorSubject неверным способом, который просто «происходит», чтобы дать мне правильный результат, то есть подписчики вызываются в фоновом потоке. Если я не пропустил это где-то в документации, не кажется очевидным, как использовать эти объекты для получения желаемого результата.

...