Объединение нескольких источников приема - PullRequest
0 голосов
/ 30 ноября 2018

Представим себе следующую ситуацию: у нас есть функция, которая возвращает наблюдаемый источник объектов со стороны сервера:

private fun getStatistics(): Observable<TestStatistics> {
        return Observable
                .fromIterable(listOf(
                        TestStatistics(1.1, 1.2, 4),
                        TestStatistics(2.1, 2.2, 1),
                        TestStatistics(3.1, 3.2, 99)
                ))
                .delay(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
    }

Объект TestStatistics:

data class TestStatistics(val doubleCashBack: Double, val doubleAmount: Double, val currencyId: Int)

Как видно из ответа серверау нас есть currencyId, который указывает нам на сущность Currency:

data class TestCurrency(val currencyId: Int, val currencySign: String)

И у нас есть другая функция, которая возвращает единый источник сущности Currency по идентификатору из базы данных:

private fun getCurrencyById(id: Int): Single<TestCurrency> {
        return when (id) {
            1 -> Single.just(TestCurrency(1, "!"))
            2 -> Single.just(TestCurrency(2, "@"))
            3 -> Single.just(TestCurrency(3, "#"))
            else -> Single.error(Exception("Currency not found"))
        }
                .delay(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
    }

Основная идея заключается в том, чтобывозьмите каждый испущенный объект Statistic, возьмите его свойства, объедините их, а затем выведите объединенный объект со свойствами и Currency в качестве объекта, поэтому возникает проблема, в этом случае мы должны взять currencyId, чтобы сначала получить успешно полученный объект Currency из базы данных, а затем создать объект-результатТаким образом, класс результата будет выглядеть следующим образом:

data class TestDashboardStatistics(val count: Int, val cashBack: Double, val amount: Double, val testCurrency: TestCurrency)

Но у меня есть некоторые проблемы с этой комбинацией наблюдаемых источников, запрос к серверу выполняется в одном потоке, база данных в другом и комбинирует код в третьем, поэтому я должен быть уверенчто я обработаю всю статистику, полученную с сервера,я буду игнорировать все ошибки, возвращенные из базы данных (только если я наконец найду валюту, если все запросы не будут выполнены, я должен вернуть значение по умолчанию), и сделаю только один успешный запрос к базе данных, поместит этот объект в объект-результат и даст егоback Функция Combing может выглядеть следующим образом:

private fun getCombinedStatistics(): Single<TestDashboardStatistics> {
        return Single.create<TestDashboardStatistics> {
            var transactionsAmount = 0.0
            var cashBackAmount = 0.0
            var count = 0
            var currency = TestCurrency(-1, "default")

            getStatistics().subscribe({ statistic ->
                ++count
                transactionsAmount += statistic.doubleAmount
                cashBackAmount += statistic.doubleCashBack
                getCurrencyById(statistic.currencyId).subscribe({ cur ->
                    // TODO do not request currency for future statistics because we have it now but
                    // TODO because different threads we can subscribe for new request before we will receive this result
                    currency = cur
                }, { err ->
                    // TODO ignore error if there is a hope that other statistics will have valid currency code
                })
            }, {
                // On requesting statistics error just throw it up
                Single.error<TestDashboardStatistics>(it)
            }, {
                // When all statistics will be received and precessed emit result
                // But it could be called even before we will receive any response from database
                Single.just(TestDashboardStatistics(count, cashBackAmount, transactionsAmount, currency))
            })
        }
    }

Одно решение, которое мне приходит, - это сделать запрос на получение валюты из базы данных, каким-то образом блокируя статистику обработки, поэтому обработка будет ждать, пока запрос db будет завершен, и перейти кдругой, но мои знания операторов Rx довольно плохие, поэтому я не знаю, как я могу это сделать.

1 Ответ

0 голосов
/ 30 ноября 2018

Я рекомендую оставить блокировку запроса базы данных, как вы предложили:

data class TestStatistics(val doubleCashBack: Double, val doubleAmount: Double, val currencyId: Int)
data class TestCurrency(val currencyId: Int, val currencySign: String)
data class TestDashboardStatistics(val count: Int?, val cashBack: Double, val amount: Double, val testCurrency: TestCurrency)

object Helloworld {
    private fun getStatistics(): Observable<TestStatistics> {
        return Observable
            .fromIterable(listOf(
                TestStatistics(1.1, 1.2, 4),
                TestStatistics(2.1, 2.2, 1),
                TestStatistics(3.1, 3.2, 99),
                TestStatistics(4.1, 4.3, 2),
                TestStatistics(5.1, 5.3, 3)
            ))
            .delay(2, TimeUnit.SECONDS)
    }

    private fun getCurrencyById(id: Int): TestCurrency? {
        // blocking call
        return when (id) {
            1 -> TestCurrency(1, "!")
            2 -> TestCurrency(2, "@")
            3 -> TestCurrency(3, "#")
            else -> null
        }
    }

    @JvmStatic
    fun main(args: Array<String>) {
        getStatistics()
            .map { getCurrencyById(it.currencyId) to it }
            .filter { it.first != null }
            .map { TestDashboardStatistics(null, it.second.doubleCashBack, it.second.doubleAmount, it.first!!) }
            .subscribe { println(it) }

        Thread.sleep(5000)
    }
}

Я сделал поле count обнуляемым, так как я не совсем понимаю, чего вы пытаетесь достичь.

Я бы также порекомендовал вам исключить вызовы subscribeOn из ваших вспомогательных методов и поместить их в метод main (вместе с функцией observeOn()), где вы объедините свою бизнес-логику.Таким образом, вы можете переключать потоки между различными операциями (например, подписаться на поток ui, делать вызовы БД в потоке io, выполнять сложные алгоритмы в потоке computation и т. Д.)

Надеюсь, что этопомогает:)

PS Насколько я понял ваш сценарий использования, все, что вам нужно, это простая map операция: TestStatistics -> TestDashboardStatistics.Если вы не хотите каждый раз заходить в базу данных на TestCurrency, вы можете кэшировать уже извлеченные экземпляры (используя Map ??).

...