RxJava: апстрим никогда не завершается при проглатывании ошибки - PullRequest
0 голосов
/ 03 октября 2018

Я использую RxJava для перебора списка файлов, выполняю сетевой вызов для загрузки каждого файла, затем собираю файлы, которые были успешно загружены в список, и сохраняю эти файлы в подписчике в случае успеха.

Этот код работает, за исключением случаев, когда возникает ошибка.Поведение должно состоять в том, что он регистрирует ошибку и продолжает работу, что и происходит, за исключением случаев, когда возникает ошибка. Лямбда-подписчик onSuccess никогда не вызывается.

Ожидает ли наблюдатель, что будет выдано такое же количество элементов, как воригинальный повторяемый?Как я могу пропустить ошибки и завершить их после того, как все элементы будут повторены?Есть ли что-то, кроме Single.never(), которое не выполнит передачу ошибки в нисходящий поток?

queryFiles()?.let { files ->
    Observable.fromIterable(files)
            .flatMapSingle { file ->
                uploadFile(file)
                        .onErrorResumeNext { error ->
                            log(error)
                            Single.never() // if this is returned onSuccess is never called
                        }
                        .map { response ->
                            file.id = response.id
                            file
                        }
            }
            .toList()
            .subscribe( { uploadedFiles ->
                persist(uploadedFiles) // if error occurs above, this is never called
            }, { error ->
                log(error)
            })
}

Ответы [ 2 ]

0 голосов
/ 03 октября 2018

Вот как я обрабатывал это в прошлом, используя метод zip.

  // create an observable list that you can process for you file uploads
  val responses: Response = listOf<Response>()

  queryFiles()?.let { file ->

    val observable = Observable.create(ObservableOnSubscribe<Response> { emitter ->
      // you can modify this section to your data types
      try {
        // with your uploadFile method you might be able to just add them
        // all the responses list
        emitter.onNext(uploadFile(file))
        emitter.onComplete()
      } catch (e: Exception) {
        emitter.onError(e)
      }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
    responses.add(observable)    
  }

  // i setup a simple booleanArray to handle success/failure but you can add
  // all the files that fail to a list and use that later    
  val isSuccessful = booleanArrayOf(true)
  Observable.zip<Response, Boolean>(responses, Function<Array<Any>, Boolean> { responses ->
    var isSuccessful: Boolean? = java.lang.Boolean.TRUE
    // handle success or failure
    isSuccessful
  }).subscribe(Consumer<Boolean> { aBoolean -> isSuccessful[0] = aBoolean!! }, Consumer<Throwable> { throwable ->
    isSuccessful[0] = false
  }, Action {
    // handle your OnComplete here
    // I would check the isSuccessful[0] and handle the success or failure        
  })

Это создает все ваши загрузки в список наблюдаемых, которые можно обрабатывать и объединять с zip метод.Это объединит их все, когда они будут объединены в любой массив, чтобы вы могли зациклить их - ваш результат из метода uploadFile ().Этот пример проверяет на успех или неудачу из ответов, которые возвращаются.Я удалил большую часть логики, где находится комментарий // handle success or failure.В методе функции вы можете отслеживать загрузки файлов, которые завершились неудачно или успешно.

0 голосов
/ 03 октября 2018

Ваша проблема в том, что Single может привести только к двум значениям: успешный результат или сбой.Чтобы перевести сбой в «игнорируемое» состояние, можно сначала преобразовать его в Maybe, а затем использовать по существу тот же код для обработки сбоя и успеха.

Maybe.onErrorResumeNext с возвращаемым значением Maybe.empty() приведет к 0 или 1 результату, в то время как Maybe.map выполняется только при наличии значения, точно решая проблему, как вы ее описали.

Адаптированный код:

        .flatMapMaybe { file ->
            uploadFile(file).toMaybe()
                    .onErrorResumeNext { error: Throwable ->
                        log(error)
                        Maybe.empty()
                    }
                    .map { response ->
                        file.id = response.id
                        file
                    }
        }
...