Я новичок в ReactiveX, и у меня есть случай, когда я хочу, чтобы моя наблюдаемая отправляла данные позднему подписчику (всякий раз, когда наблюдатель подписывается, наблюдаемая должна излучать те же данные, что и ранее). Я создал этот класс Observable, который предоставляет один и тот же экземпляр ReplaySubject всем наблюдателям (это одноэлементный класс).
public class AccountsObservable {
private static ConnectableObservable<String> hotObservable;
private static AccountsObservable accountsObservable;
public static AccountsObservable getObject() {
if (accountsObservable == null) {
accountsObservable = new AccountsObservable();
}
return accountsObservable;
}
public ConnectableObservable<String> getObservable() {
if (hotObservable == null) {
Observable<String> observable = ReplaySubject.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("XYZ");
emitter.onComplete();
}
});
hotObservable = observable.replay();//publish
}
return hotObservable;
}
}
Аналогично, это класс наблюдателя, который создает новый экземпляр наблюдателя.
public class AccountsObserver {
AccountsFetchListener listener;
public AccountsObserver(AccountsFetchListener listener) {
this.listener = listener;
}
public Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String accounts) {
listener.onSuccess(accounts);
}
@Override
public void onError(Throwable e) {
listener.onFailure();
}
@Override
public void onComplete() {
}
};
}
public interface AccountsFetchListener {
void onSuccess(String accounts);
void onFailure();
}
}
Вот функция, где я проверяю эти наблюдаемые
private void testObs() {
ConnectableObservable<String> observable = AccountsObservable.getObject().getObservable();
Observer<String> observer = new AccountsObserver(new AccountsObserver.AccountsFetchListener() {
@Override
public void onSuccess(String accounts) {
Log.e("DATA -> ", accounts);
}
@Override
public void onFailure() {
}
}).getObserver();
observable.subscribe(observer);
observable.connect();
}
Я вызывал эту функцию "testObs ()" 5 раз, но она отправляла данные только 2 раза. Кажется, проблема в классе AccountsObservable, где я предоставляю экземпляр ReplaySUbject. Спасибо