CompletableTransformer не применяет подписку и наблюдение в восходящем потоке - PullRequest
0 голосов
/ 25 мая 2019

Я создаю первое автономное приложение в качестве своего побочного проекта с использованием rxKotlin, MVVM + Clean Architecture, и вчера я решил отказаться от подписки на шаблон и наблюдать за ней с помощью трансформаторов. Я быстро понял, что применение функции трансформаторов игнорируется.

Вот код моего базового варианта использования (интерактор):

abstract class CompletableUseCase(private val transformer: CompletableTransformer) {

    abstract fun createCompletable(data: Map<String, Any>? = null) : Completable

    fun completable(data: Map<String, Any>? = null) : Completable {
        return createCompletable(data).compose(transformer)
    }
}

А вот реализация конкретного интерактора:

class SaveRouteInteractor(
    transformer: CompletableTransformer,
    private val routeRepository: RouteRepository
) : CompletableUseCase(transformer) {

    companion object {
        private const val PARAM_ROUTE = "param_route"
    }

    fun saveRoute(route: Route) : Completable {
        val data = HashMap<String, Route>()
        data[PARAM_ROUTE] = route
        return completable(data)
    }

    override fun createCompletable(data: Map<String, Any>?): Completable {
        val routeEntity = data?.get(PARAM_ROUTE)

        routeEntity?.let {
            return routeRepository.saveRoute(routeEntity as Route)
        } ?: return Completable.error(IllegalArgumentException("Argument @route must be provided."))
    }
}

Мой пользовательский преобразователь, который передается конструктору SaveRouteInteractor:

class IOCompletableTransformer(private val mainThreadScheduler: Scheduler) : CompletableTransformer {

    override fun apply(upstream: Completable): CompletableSource {
        return upstream.subscribeOn(Schedulers.io()).observeOn(mainThreadScheduler)
    }
}

И реализация метода RouteRepository:

override fun saveRoute(route: Route): Completable {
        return localRouteSource.saveRoute(route)
            .flatMap { localID ->
                route.routeId = localID
                remoteRouteSource.saveRoute(route)
            }
            .flatMapCompletable { localRouteSource.updateRouteID(route.routeId, it) }
    }

Я использую Room в качестве локального источника, поэтому после вызова интерактивного сохранения в моей ViewModel я получаю исключение IlligalStateException, сообщающее, что мне не разрешен доступ к базе данных в главном потоке.

Может быть, я что-то упускаю, но кажется, что функция преобразования игнорируется. Я отладил этот метод, и он применяет подписку и наблюдение в восходящем потоке.

Спасибо за помощь заранее, Пейс!

1 Ответ

1 голос
/ 27 мая 2019

Трудно сказать, где проблема, потому что код неполный.

Например, здесь:

    return localRouteSource.saveRoute(route)
        .flatMap { localID ->
            route.routeId = localID
            remoteRouteSource.saveRoute(route)
        }
        .flatMapCompletable { localRouteSource.updateRouteID(route.routeId, it) }

Полагаю, localRouteSource.saveRoute() использует интерактор, который вы нам показываете, но неясно, как реализованы remoteRouteSource.saveRoute() или localRouteSource.updateRouteID().

они также должны быть подписаны в потоке ввода-вывода.

Как правило, вы должны переключать нити, когда ЗНАЕТЕ, что вам это нужно.

Другими словами, вы должны использовать subscribeOn() в тех местах, где вы знаете, что делаете ввод-вывод как можно ближе к реальной работе. Вместо этого ObserveOn должен использоваться, когда вы знаете, что вам нужно получить эти результаты в потоке пользовательского интерфейса и что вы можете получить в каком-то другом потоке.

в вашем примере абсолютно не нужно использовать observeOn(MAIN_THREAD), единственный раз, когда он вам нужен (я полагаю), это когда вы хотите показать результат.

Пара других вещей:

Этот код

override fun createCompletable(data: Map<String, Any>?): Completable {
    val routeEntity = data?.get(PARAM_ROUTE)

    routeEntity?.let {
        return routeRepository.saveRoute(routeEntity as Route)
    } ?: return Completable.error(IllegalArgumentException("Argument @route must be provided."))
}

он оценивается во время, когда вызывается метод, а не когда подписка завершается.

Другими словами, он нарушает контракт Rx и вычисляет data?.get(PARAM_ROUTE) при вызове метода. Если он неизменный, нет большой разницы, но если он может изменить значение во время выполнения, он должен быть заключен в Completable.defer { }

Наконец, здесь

        .flatMap { localID ->
            route.routeId = localID
            remoteRouteSource.saveRoute(route)
        }

вы модифицируете что-то вне цепочки (route.routeId = localID), это называется побочным эффектом.

будьте осторожны с такими вещами, Rx построен так, чтобы его было безопаснее использовать с неизменяемыми объектами.

Лично я бы не возражал, если вы понимаете, что происходит и когда это может создавать проблемы.

...