RxJava - ReplaySubject только отправляет данные дважды - PullRequest
0 голосов
/ 30 июня 2019

Я новичок в ReactiveX, и у меня есть случай, когда я хочу, чтобы моя наблюдаемая отправляла данные позднему подписчику (всякий раз, когда наблюдатель подписывается, наблюдаемая должна излучать те же данные, что и ранее). Я создал этот класс Observable, который предоставляет один и тот же экземпляр ReplaySubject всем наблюдателям (это одноэлементный класс).

public class AccountsObservable {
    private static ConnectableObservable<String> hotObservable;
    private static AccountsObservable accountsObservable;


    public static AccountsObservable getObject() {
        if (accountsObservable == null) {
            accountsObservable = new AccountsObservable();
        }
        return accountsObservable;
    }

    public ConnectableObservable<String> getObservable() {
        if (hotObservable == null) {
            Observable<String> observable = ReplaySubject.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("XYZ");
                    emitter.onComplete();

                }
            });
            hotObservable = observable.replay();//publish
        }
        return hotObservable;
    }
}

Аналогично, это класс наблюдателя, который создает новый экземпляр наблюдателя.

public class AccountsObserver {
    AccountsFetchListener listener;

    public AccountsObserver(AccountsFetchListener listener) {
        this.listener = listener;
    }

    public Observer<String> getObserver() {
        return new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String accounts) {
                listener.onSuccess(accounts);
            }

            @Override
            public void onError(Throwable e) {
                listener.onFailure();
            }

            @Override
            public void onComplete() {

            }
        };

    }

    public interface AccountsFetchListener {
        void onSuccess(String accounts);

        void onFailure();
    }
}

Вот функция, где я проверяю эти наблюдаемые

private void testObs() {
    ConnectableObservable<String> observable = AccountsObservable.getObject().getObservable();
    Observer<String> observer = new AccountsObserver(new AccountsObserver.AccountsFetchListener() {
        @Override
        public void onSuccess(String accounts) {
            Log.e("DATA -> ", accounts);
        }

        @Override
        public void onFailure() {
        }
    }).getObserver();
    observable.subscribe(observer);
    observable.connect();

}

Я вызывал эту функцию "testObs ()" 5 раз, но она отправляла данные только 2 раза. Кажется, проблема в классе AccountsObservable, где я предоставляю экземпляр ReplaySUbject. Спасибо

1 Ответ

1 голос
/ 30 июня 2019

Ваш код работает нормально как есть, ваши журналы подавляются в logcat согласно this :

Мы объявили приложение слишком болтливым, когда оно регистрирует более 5линии в секунду.Пожалуйста, отправьте сообщение об ошибке владельцу приложения, которое создает этот спам для ведения журнала на уровне разработчика.Журналы имеют размер 256 КБ, что означает, что приложение создает атаку DOS и сокращает временной интервал журналов до 6 секунд (!), Делая его бесполезным для всех остальных.для logcat:

adb logcat -P '<pid or uid of your app>'

...