Я думаю, что самый простой вариант - использовать перегрузку subscribe
, которая возвращает Disposable
, и каждый обработчик вызывает соответствующий метод на вашем Subject
, вот так:
Disposable d = observable
.subscribe(subject::onNext, subject::onError, subject::onComplete);
// Later
d.dispose();
Вы также можете создать DisposableObserver
, который пересылает все сообщения на Subject
, и использовать subscribeWith
вместо subscribe
, хотя он более подробный:
Disposable d = observable
.subscribeWith(new DisposableObserver<Integer>() {
@Override public void onStart() {
}
@Override public void onNext(Integer t) {
subject.onNext(t);
}
@Override public void onError(Throwable t) {
subject.onError(t);
}
@Override public void onComplete() {
subject.onComplete();
}
});
Я не знаю никаких более чистых вариантов, и эта проблема из трекера ошибок Rx Java, похоже, подтверждает это, хотя он нацелен на RxJava2.