RxJava2 - объединяет значения и побочные эффекты из потока - PullRequest
0 голосов
/ 14 января 2020

Я пытаюсь извлечь некоторые общие логи c, основанные на RxJava2, в повторно используемые компоненты. Давайте представим, что у меня есть следующий фрагмент кода:

someSingle
    .doOnSuccess { // update UI based on side effect }
    .subscribeOn(...)
    .observeOn(...)
    .subscribe(
        value -> // update UI based on value
        throwable -> // handle error
    )

Я хочу обернуть это в повторно используемый компонент, предоставляя метод, который возвращает Flowable событий. Клиенты будут получать события и соответственно обновлять пользовательский интерфейс. Моя цель - не ссылаться на представление внутри повторно используемого компонента. Я хочу, чтобы метод был примерно таким:

fun reusableMethod(...) : Flowable<Event> { ... }

Event - это запечатанный класс, включающий два подтипа - SideEffectEvent и ValueEvent.

Что лучший способ преобразовать поток из первого фрагмента, чтобы я мог получить как побочный эффект, так и значение, которое будет передаваться как текущие значения?

В настоящее время у меня есть следующее решение, но я Я не очень доволен этим, потому что он выглядит немного неуклюжим и сложным:

private val sideEffectEvents = PublishProcessor.create<SideEffectEvent>()

fun reusableMethod(...) = 
    Flowable.merge(
        someSingle.doOnSuccess { sideEffectEvents.onNext(SideEffectEvent()) },
        sideEffectEvents
    )
    .subscribeOn(...)
    .observeOn(...)

Я также рассмотрел несколько альтернатив:

  • Уведомление клиента за SideEffectEvent s использование обратного вызова, который передается на someReusableMethod() - выглядит очень неестественно и наличие обратного вызова и потока для подписки не является хорошим стилем кода
  • Используйте один PublishProcessor. Опубликуйте побочные эффекты и используйте его, чтобы подписаться на оригинал Single. Предоставьте метод cleanUp() в повторно используемом компоненте, чтобы клиент мог распоряжаться потоком, когда он решит.

Я с нетерпением жду предложений и идей.

Ответы [ 2 ]

0 голосов
/ 15 января 2020

Как насчет этого:

До:

someSingle
    .operation1()
    .operation2()
    .doOnSuccess { // update UI based on side effect }
    .operation3()
    .operation4()
    .subscribeOn(...)
    .observeOn(...)
    .subscribe(
        value -> // update UI based on value
        throwable -> // handle error
    )

Многоразовое использование:

fun reusableMethod(...): Flowable<Event> = 
    someSingle
        .operation1()
        .operation2()
        .flatMapPublisher {
            Single.concat(
                Single.just(getSideEffectEvent(it)),
                Single.just(it)
                    .operation3()
                    .operation4()
                    .map { value -> getValueEvent(value) }
            )
        }
        .subscribeOn(...)
        .observeOn(...)

Вы можете еще больше упростить это, используя Flowable#startWith и избегая Single#concat()

0 голосов
/ 14 января 2020

Прежде всего это не должно быть Flowable. Это может быть просто Observable. Но приведенное ниже решение должно работать в обоих случаях. подробнее здесь Observable vs Flowable

Этот код не тестировался, я написал его, чтобы дать вам упрощенное представление о том, как этого добиться.

// a sealed class representing current state 
sealed class ViewState {
    object Loading : ViewState() // using object because we do not need any data in cass of loading
    data class Success(val data: List<Model>) : ViewState()
    data class Error(val t: Throwable) : ViewState()
}

// an observalbe or flowable returning a single object ViewState
// it will always return ViewState class containing either data or error or loading state
return service.getData()
    .map { data -> ViewState.Success(data) } // on successful data fetch
    .startWith(ViewState.Loading()) // show loading on start of fetch
    .onErrorReturn { exception -> ViewState.Error(exception) } // return error state 
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())

// somewhere in Activity or in multiple activities subscribe to above observable
subscribe({ viewState ->
     when {
         viewState.Loading -> showProgressView()
         viewState.Error -> showErrorView(viewState.t)
         viewState.Success -> showData(viewState.data)
         else -> IllegalArgumentException("Invalid Response")
     }

})
...