@Override
public void onPause() {
super.onPause();
compositeDisposable.clear();
}
@Override
public void onResume() {
super.onResume();
Flowable<MyData> distanceFlowable = myDataProcessor.hide().onBackpressureLatest()
.distinctUntilChanged()
.share();
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 1", data.value)});
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 2", data.value)});
}
Код работает хорошо, когда distanceFlowable постоянно получает новые изменения. Однако в сценарии, когда на расстоянии только одна публикация, Flowable получает уведомление только наблюдатель 1 .
Он ведет себя следующим образом:
- сообщение на distanceFlowable
- наблюдатель 1 подписывается
- logcat печатает "наблюдатель 1: 133"
- наблюдатель 2 подписывается
Я бы хотел он должен вести себя как:
- сообщение на расстоянииFlowable
- наблюдатель 1 подписывается
- logcat печатает «наблюдатель 1: 133»
- наблюдатель 2 подписывается
- logcat печатает «наблюдатель 2: 133»
Я пробовал использовать ConnectedFlowable
вместо publish()
, а затем connect()
на поток после обоих наблюдателя Подписаны 1 и наблюдатель 2 . Но тогда он по-прежнему публикуется как текучий даже после того, как compositeDisposable
очищен, и никто не слушает.
Какой предпочтительный способ решить эту проблему?