У меня есть метод getPlaces
в моем хранилище:
override fun getPlaces(filter: FilterRequest): Flowable<List<Place>> {
return from(placesApi.filter(filter))
.doOnSuccess {
placesDao.savePlaces(it)
}
.flatMapPublisher { it ->
placesDao.getPlaces(it.map { it.placeId })
}
}
Этот метод собирает результат из API, затем сохраняет результаты в базе данных и возвращает текучую среду с местами, полученными по id избаза данных как Flowable
:
@Query("select * from Places where placeId in (:placesIds)")
fun getPlaces(placesIds: List<String>) : Flowable<List<Place>>
Теперь каждый раз, когда я изменяю один из этих объектов, я вижу изменения во всем приложении.
Теперь я хочу объединить эти результаты с расстояниемиз текущего местоположения, например, так:
override fun addDistanceToPlaces(req: Flowable<List<Place>>): Flowable<List<Place>> {
return req
.zipWith(getLastLocation().toFlowable(BackpressureStrategy.LATEST),
BiFunction<List<Place>, Location, List<Place>> { places, location ->
places.forEach {
var placeLocation = Location(it.placeName)
placeLocation.latitude = it.latitude
placeLocation.longitude = it.longitude
it.distance = location.distanceTo(placeLocation)
}
places.sortedBy {
it.distance
}
})
.onErrorResumeNext { t: Throwable ->
req
}
}
Это работает, однако, если я применю это, я теряю "обновления" из комнаты;Об изменениях не сообщается наблюдателям, поэтому я должен выполнить обновление вручную.
Почему это происходит?Разве zip
не должен объединять выбросы из обоих источников?