Представим себе следующую ситуацию: у нас есть функция, которая возвращает наблюдаемый источник объектов со стороны сервера:
private fun getStatistics(): Observable<TestStatistics> {
return Observable
.fromIterable(listOf(
TestStatistics(1.1, 1.2, 4),
TestStatistics(2.1, 2.2, 1),
TestStatistics(3.1, 3.2, 99)
))
.delay(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
}
Объект TestStatistics:
data class TestStatistics(val doubleCashBack: Double, val doubleAmount: Double, val currencyId: Int)
Как видно из ответа серверау нас есть currencyId, который указывает нам на сущность Currency:
data class TestCurrency(val currencyId: Int, val currencySign: String)
И у нас есть другая функция, которая возвращает единый источник сущности Currency по идентификатору из базы данных:
private fun getCurrencyById(id: Int): Single<TestCurrency> {
return when (id) {
1 -> Single.just(TestCurrency(1, "!"))
2 -> Single.just(TestCurrency(2, "@"))
3 -> Single.just(TestCurrency(3, "#"))
else -> Single.error(Exception("Currency not found"))
}
.delay(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
}
Основная идея заключается в том, чтобывозьмите каждый испущенный объект Statistic, возьмите его свойства, объедините их, а затем выведите объединенный объект со свойствами и Currency в качестве объекта, поэтому возникает проблема, в этом случае мы должны взять currencyId, чтобы сначала получить успешно полученный объект Currency из базы данных, а затем создать объект-результатТаким образом, класс результата будет выглядеть следующим образом:
data class TestDashboardStatistics(val count: Int, val cashBack: Double, val amount: Double, val testCurrency: TestCurrency)
Но у меня есть некоторые проблемы с этой комбинацией наблюдаемых источников, запрос к серверу выполняется в одном потоке, база данных в другом и комбинирует код в третьем, поэтому я должен быть уверенчто я обработаю всю статистику, полученную с сервера,я буду игнорировать все ошибки, возвращенные из базы данных (только если я наконец найду валюту, если все запросы не будут выполнены, я должен вернуть значение по умолчанию), и сделаю только один успешный запрос к базе данных, поместит этот объект в объект-результат и даст егоback Функция Combing может выглядеть следующим образом:
private fun getCombinedStatistics(): Single<TestDashboardStatistics> {
return Single.create<TestDashboardStatistics> {
var transactionsAmount = 0.0
var cashBackAmount = 0.0
var count = 0
var currency = TestCurrency(-1, "default")
getStatistics().subscribe({ statistic ->
++count
transactionsAmount += statistic.doubleAmount
cashBackAmount += statistic.doubleCashBack
getCurrencyById(statistic.currencyId).subscribe({ cur ->
// TODO do not request currency for future statistics because we have it now but
// TODO because different threads we can subscribe for new request before we will receive this result
currency = cur
}, { err ->
// TODO ignore error if there is a hope that other statistics will have valid currency code
})
}, {
// On requesting statistics error just throw it up
Single.error<TestDashboardStatistics>(it)
}, {
// When all statistics will be received and precessed emit result
// But it could be called even before we will receive any response from database
Single.just(TestDashboardStatistics(count, cashBackAmount, transactionsAmount, currency))
})
}
}
Одно решение, которое мне приходит, - это сделать запрос на получение валюты из базы данных, каким-то образом блокируя статистику обработки, поэтому обработка будет ждать, пока запрос db будет завершен, и перейти кдругой, но мои знания операторов Rx довольно плохие, поэтому я не знаю, как я могу это сделать.