Использование share (): отправьте последнее значение новым подписчикам - PullRequest
1 голос
/ 17 июня 2020
@Override
public void onPause() {
    super.onPause();
    compositeDisposable.clear();
}

@Override
public void onResume() {
    super.onResume();

    Flowable<MyData> distanceFlowable = myDataProcessor.hide().onBackpressureLatest()
            .distinctUntilChanged()
            .share();

    compositeDisposable.add(distanceFlowable)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(data -> {Log.w("observer 1", data.value)});

    compositeDisposable.add(distanceFlowable)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(data -> {Log.w("observer 2", data.value)});
}

Код работает хорошо, когда distanceFlowable постоянно получает новые изменения. Однако в сценарии, когда на расстоянии только одна публикация, Flowable получает уведомление только наблюдатель 1 .

Он ведет себя следующим образом:

  1. сообщение на distanceFlowable
  2. наблюдатель 1 подписывается
  3. logcat печатает "наблюдатель 1: 133"
  4. наблюдатель 2 подписывается

Я бы хотел он должен вести себя как:

  1. сообщение на расстоянииFlowable
  2. наблюдатель 1 подписывается
  3. logcat печатает «наблюдатель 1: 133»
  4. наблюдатель 2 подписывается
  5. logcat печатает «наблюдатель 2: 133»

Я пробовал использовать ConnectedFlowable вместо publish(), а затем connect() на поток после обоих наблюдателя Подписаны 1 и наблюдатель 2 . Но тогда он по-прежнему публикуется как текучий даже после того, как compositeDisposable очищен, и никто не слушает.

Какой предпочтительный способ решить эту проблему?

Ответы [ 2 ]

1 голос
/ 17 июня 2020

Просто используйте .compose(ReplayingShare.instance()); из https://github.com/JakeWharton/RxReplayingShare

Это должно работать.

1 голос
/ 17 июня 2020

Я пропустил, что flowable.connect() возвращает Disposable, который я мог бы добавить к моему compositeDisposable, таким образом он очищается в onPause.

...