Есть две проблемы:
1.
subject.doOnNext(result -> Log.d("Subject", "accept: " + result));
В приведенном выше коде результат doOnNext
не подписан.doOnNext
не подписывается на апстрим самостоятельно, как и многие другие операторы.Измените это, например:
subject.doOnNext(result -> Log.d("Subject", "accept: " + result)).subscribe();
2.
observableSubject
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(subject);
observableSubject.onNext(1);
observableSubject.onComplete();
В приведенном выше коде onComplete
вызывается сразу после .onNext
.Это может вызвать проблемы с синхронизацией при отправке значений.
Измените код выше на
observableSubject
.subscribe(subject); // subscribe on the same thread so that everything happens sequentially.
observableSubject.onNext(1);
observableSubject.onComplete();
или
Subject<Integer> observableSubject = BehaviorSubject.create();
observableSubject
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(subject);
observableSubject.onNext(1);
// observableSubject.onComplete(); // don't call onComplete/