Как отсеять поток событий с помощью длинной задачи обработки в RxJava - PullRequest
0 голосов
/ 06 июня 2018

Я работаю над приложением, где мы загружаем набор файлов и обрабатываем их все на лету.Вот что я делаю:

Observable.from(fileUrls)
  .compose(downloadAndPersistFiles())
  .compose(processPersistedData())
  .subscribe()

fileUrls - это набор файлов URL.downloadAndPersistFiles извлекает данные из загруженных файлов и сохраняет их в локальной базе данных.Он возвращает событие каждый раз, когда я успешно загружал и сохранял данные файла.Более того, я использую Schedulers.io(), чтобы ускорить пул потоков для максимально быстрой загрузки этих файлов.

   private  <T> Observable.Transformer<T, T> downloadAndPersistFiles() {
    return observable -> observable
            .flatMap(fileUrls -> Observable.from(fileUrls)
                    .subscribeOn(Schedulers.io())
                    .compose(download())
                    .compose(saveToDb());
   }

Для каждого успешно загруженного и обработанного файла я запускаю дополнительную задачу, которая в основномНабор запросов к БД для извлечения дополнительных данных.

   private  <T> Observable.Transformer<T, T> processPersistedData() {
    return observable -> observable
             //modified place - debounce, throttleFirst, throttleLast etc
            .flatMap(file -> Observable.from(tasks)
                    .compose(runQueryToExtractData())
                    .toList()
                    .flatMap(ignored -> Observable.just(file)));
   }

Я знаю, что это плохо масштабируется, набор данных в базе данных растет, поэтому запросы занимают все больше и больше времени.

processPersistedData вызывается для каждого события из downloadAndPersistFiles (он использует пул потоков), поэтому в какой-то момент параллельно выполняется несколько processPersistedData операций, и я хочу ограничить его только одной.

Вот что я пробовал до сих пор:

  • debounce с тайм-аутом - это добавляет дополнительную задержку после каждого загруженного файла, и если загрузка файлов занимает меньше времени, чем тайм-аут,поток будет голодать до тех пор, пока не появится файл, достаточно большой, чтобы его загрузка и сохранение заняли больше времени
  • throttleLast - он добавляет дополнительную задержку после каждого загруженного файла, потому что мне нужно ждать, пока закончится временное окно
  • throttleFirst - нет задержки для первого файла, но я могу пропустить несколько последних событий - лучшее решение, которое я нашел до сих пор.Основная проблема, с которой я столкнулся, заключается в том, что я не могу синхронизировать загрузку файлов и выполнение запросов - в начале запросы выполняются очень быстро, поэтому я хочу использовать короткий тайм-аут, насколько это возможно, но со временем они могут занять более 10-20 секунд, поэтомуочевидно, я хотел бы замедлить в это время.Более того, это не мешает запустить два
  • debounce with selector - это звучит идеально!Я мог бы использовать processPersistedData в качестве селектора, который будет отбрасывать все события, когда processPersistedData работает, и потреблять любые новые события, как только он завершится, но после того, как я попробовал его, processPersistedData запускался каждый раз - новый поток processPersistedDataЛайк был создан для каждого события.

Есть ли у вас какие-либо идеи, как эта проблема может быть подходом?Или я пропустил, когда попробовал debounce with selector?

1 Ответ

0 голосов
/ 06 июня 2018

Оператор flatMap() принимает дополнительный параметр, который ограничивает количество параллельных операций.

private  <T> Observable.Transformer<T, T> processPersistedData() {
 return observable -> observable
        .flatMap(input -> Observable.from(tasks)
            //modified place - debounce, throttleFirst, throttleLast etc
            .compose(runQueryToExtractData())
            .toList()
            .flatMap(ignored -> Observable.just(input)), 1);
}

1 указывает, что flatMap() будет обрабатывать только один элемент за раз.

В качестве отступления, где у вас есть compose(runQueryToExtractData()), вы можете вместо этого использовать Completable.

...