После некоторых экспериментов кажется, что при использовании BehaviorSubject
объектов нужно немного позаботиться, и их использование не так очевидно, как я понял из названий различных интерфейсов.
В качестве метода тестирования, чтобы продемонстрировать, что я сейчас делаю:
@Test
public void test()
{
System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
rotorBehaviour.subscribeOn(Schedulers.single());
rotorBehaviour.subscribe(new Observer<Integer>()
{
@Override
public void onSubscribe(final Disposable d)
{
System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onNext(final Integer integer)
{
System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onError(final Throwable e)
{
System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onComplete()
{
System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
}
});
rotorBehaviour.onNext(1);
rotorBehaviour.onNext(2);
}
Это приводит к нежелательному результату:
Выполнение теста с идентификатором потока: 1
onSubscribe () вызывается для идентификатора потока: 1
onNext () вызывается для идентификатора потока: 1
onNext () вызывается для идентификатора потока: 1
Процесс завершен с кодом выхода 0
(нежелательно, потому что onNext()
вызывается в главном потоке)
Изменение кода для использования Observable
, возвращенного из вызова на subscribeOn
, дает тот же нежелательный результат:
@Test
public void test()
{
System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
Observable<Integer> rotorObservable = rotorBehaviour.subscribeOn(Schedulers.single());
rotorObservable.subscribe(new Observer<Integer>()
{
@Override
public void onSubscribe(final Disposable d)
{
System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onNext(final Integer integer)
{
System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onError(final Throwable e)
{
System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onComplete()
{
System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
}
});
rotorBehaviour.onNext(1);
rotorBehaviour.onNext(2);
}
Результат:
Выполнение теста с идентификатором потока: 1
onSubscribe () вызывается для идентификатора потока: 1
onNext () вызывается для идентификатора потока: 1
onNext () вызывается для идентификатора потока: 1
Процесс завершен с кодом выхода 0
Однако использование метода observeOn()
дает желаемый результат:
@Test
public void test()
{
System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
Observable<Integer>rotorObservable = rotorBehaviour.observeOn(Schedulers.single());
rotorObservable.subscribe(new Observer<Integer>()
{
@Override
public void onSubscribe(final Disposable d)
{
System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onNext(final Integer integer)
{
System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onError(final Throwable e)
{
System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onComplete()
{
System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
}
});
rotorBehaviour.onNext(1);
rotorBehaviour.onNext(2);
}
Выполнение теста с идентификатором потока: 1
onSubscribe () вызывается для ID потока: 1
onNext () вызывается для идентификатора потока: 13
onNext () вызывается для идентификатора потока: 13
Процесс завершен с кодом выхода 0
Также во всех примерах я все еще использую объект BehaviorSubject
, чтобы инициировать событие, и только случайно обнаружил, что это даст желаемый результат.
Что меня беспокоит, так это то, что я могу использовать Observable
и BehaviorSubject
неверным способом, который просто «происходит», чтобы дать мне правильный результат, то есть подписчики вызываются в фоновом потоке. Если я не пропустил это где-то в документации, не кажется очевидным, как использовать эти объекты для получения желаемого результата.