Consumer.accept () не вызывается для субъекта в doOnNext () - PullRequest
0 голосов
/ 23 сентября 2019

У меня есть два Subject s, один подписывается на другой для получения обновлений.

Subject<Integer> subject = new Subject<>() {
    @Override
    public boolean hasObservers() {
        return false;
    }

    @Override
    public boolean hasThrowable() {
        return false;
    }

    @Override
    public boolean hasComplete() {
        return false;
    }

    @Override
    public Throwable getThrowable() {
        return null;
    }

    @Override
    protected void subscribeActual(Observer<? super InitialAPIResponse> observer) {

    }

    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer result) {
        Log.d(TAG, "onNext: " + apiResponse);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

subject.doOnNext(result -> Log.d("Subject", "accept: " + result));

observableSubject
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(subject);

observableSubject.onNext(1);
observableSubject.onComplete();

Когда вызывается onNext(), Consumer accept() предоставляется в doOnNext()не является.Хотя согласно документации

Observable.doOnNext ()

Изменяет источник ObservableSource, чтобы он вызывал действие при вызове onNext.

Планировщик: doOnNext не работает по умолчанию для определенного Scheduler

onNext действие, которое вызывается, когда источник ObservableSource вызывает onNext

возврат источник ObservableSource с примененным побочным поведением

Из того, что я понимаю из документации, наблюдаемое должно вызвать Consumer в doOnNext().

Я учусь RxJava поэтому, может быть, я здесь что-то не так делаю ...

1 Ответ

0 голосов
/ 24 сентября 2019

Есть две проблемы:

1.

subject.doOnNext(result -> Log.d("Subject", "accept: " + result));

В приведенном выше коде результат doOnNext не подписан.doOnNext не подписывается на апстрим самостоятельно, как и многие другие операторы.Измените это, например:

subject.doOnNext(result -> Log.d("Subject", "accept: " + result)).subscribe();

2.

observableSubject
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(subject);

observableSubject.onNext(1);
observableSubject.onComplete();

В приведенном выше коде onComplete вызывается сразу после .onNext.Это может вызвать проблемы с синхронизацией при отправке значений.

Измените код выше на

observableSubject
            .subscribe(subject); // subscribe on the same thread so that everything happens sequentially. 

observableSubject.onNext(1);
observableSubject.onComplete();

или

Subject<Integer> observableSubject = BehaviorSubject.create();
observableSubject
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(subject);

observableSubject.onNext(1);
// observableSubject.onComplete(); // don't call onComplete/
...