Я работаю над приложением, где мы загружаем набор файлов и обрабатываем их все на лету.Вот что я делаю:
Observable.from(fileUrls)
.compose(downloadAndPersistFiles())
.compose(processPersistedData())
.subscribe()
fileUrls
- это набор файлов URL.downloadAndPersistFiles
извлекает данные из загруженных файлов и сохраняет их в локальной базе данных.Он возвращает событие каждый раз, когда я успешно загружал и сохранял данные файла.Более того, я использую Schedulers.io()
, чтобы ускорить пул потоков для максимально быстрой загрузки этих файлов.
private <T> Observable.Transformer<T, T> downloadAndPersistFiles() {
return observable -> observable
.flatMap(fileUrls -> Observable.from(fileUrls)
.subscribeOn(Schedulers.io())
.compose(download())
.compose(saveToDb());
}
Для каждого успешно загруженного и обработанного файла я запускаю дополнительную задачу, которая в основномНабор запросов к БД для извлечения дополнительных данных.
private <T> Observable.Transformer<T, T> processPersistedData() {
return observable -> observable
//modified place - debounce, throttleFirst, throttleLast etc
.flatMap(file -> Observable.from(tasks)
.compose(runQueryToExtractData())
.toList()
.flatMap(ignored -> Observable.just(file)));
}
Я знаю, что это плохо масштабируется, набор данных в базе данных растет, поэтому запросы занимают все больше и больше времени.
processPersistedData
вызывается для каждого события из downloadAndPersistFiles
(он использует пул потоков), поэтому в какой-то момент параллельно выполняется несколько processPersistedData
операций, и я хочу ограничить его только одной.
Вот что я пробовал до сих пор:
debounce
с тайм-аутом - это добавляет дополнительную задержку после каждого загруженного файла, и если загрузка файлов занимает меньше времени, чем тайм-аут,поток будет голодать до тех пор, пока не появится файл, достаточно большой, чтобы его загрузка и сохранение заняли больше времени throttleLast
- он добавляет дополнительную задержку после каждого загруженного файла, потому что мне нужно ждать, пока закончится временное окно throttleFirst
- нет задержки для первого файла, но я могу пропустить несколько последних событий - лучшее решение, которое я нашел до сих пор.Основная проблема, с которой я столкнулся, заключается в том, что я не могу синхронизировать загрузку файлов и выполнение запросов - в начале запросы выполняются очень быстро, поэтому я хочу использовать короткий тайм-аут, насколько это возможно, но со временем они могут занять более 10-20 секунд, поэтомуочевидно, я хотел бы замедлить в это время.Более того, это не мешает запустить два debounce with selector
- это звучит идеально!Я мог бы использовать processPersistedData
в качестве селектора, который будет отбрасывать все события, когда processPersistedData
работает, и потреблять любые новые события, как только он завершится, но после того, как я попробовал его, processPersistedData
запускался каждый раз - новый поток processPersistedData
Лайк был создан для каждого события.
Есть ли у вас какие-либо идеи, как эта проблема может быть подходом?Или я пропустил, когда попробовал debounce with selector
?