Как заставить карту ждать завершения обработки текущего элемента индекса, а затем принять следующий элемент для обработки, используя Rx Java? - PullRequest
0 голосов
/ 27 апреля 2020

Я пытаюсь преобразовать входные потоки в файл. Поэтому, когда пользователь выбирает 1 изображение, все работает хорошо. Но когда пользователь выбирает несколько изображений, например 4, приведенный ниже код не работает должным образом; Я вижу только 2 пути к файлам в журнале оператора.


compositeDisposable.add(Observable.fromIterable(inputStreamList)
                .map {
                    FileUtils.saveInputStreamToFile(it, directory, 500)
                }.toList()
                .toObservable()
                .subscribeOn(schedulerProvider.io())
                .subscribe({
                    it.forEach {file->
                        Log.d("TAG", "Path ${file.path}")
                    }
                    Log.d("TAG", "Size ${it.size}")
                }, {

                })
        )

Вот метод saveInputStreamToFile


fun saveInputStreamToFile(input: InputStream, directory: File, height: Int): File? {
        val currentTime = dateFormat.format(Date())
        val imageName = "_$currentTime"
        val temp = File(directory.path + File.separator + "flab\$file\$for\$processing")
        try {
            val final = File(directory.path + File.separator + imageName + ".$IMAGE_JPG")
            val output = FileOutputStream(temp)
            try {
                val buffer = ByteArray(4 * 1024) // or other buffer size
                var read: Int = input.read(buffer)
                while (read != -1) {
                    output.write(buffer, 0, read)
                    read = input.read(buffer)
                }
                output.flush()
                saveBitmap(decodeFile(temp, height)!!, final.path, IMAGE_JPG, 80)
                return final
            } finally {
                output.close()
                temp.delete()
            }
        } finally {
            input.close()
        }
    }


Я хочу, чтобы следующий входной поток был взят только после преобразования текущего входного потока в файл. Как этого добиться? Пожалуйста, помогите

1 Ответ

1 голос
/ 27 апреля 2020

В RX вы можете сделать что-то вроде этого:

    Observable.fromCallable {
        return inputStreamList.map {
            FileUtils.saveInputStreamToFile(it, directory, 500)
        }.toList()

    }.observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(object : Observer<List<File>> {
            override fun onComplete() {

            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(files: List<File>) {
                //here you can iterate and complete do your work
                files.forEach { file ->
                    Log.d("TAG", "Path ${file.path}")
                }
                Log.d("TAG", "Size ${files.size}")

            }

            override fun onError(e: Throwable) {
            }

        })

Это гарантирует, что ваша работа будет выполняться синхронно в фоновом режиме, и вы увидите результат в onNext() в MainThread, как указано.

Убедитесь, что вы используете пакет io.reactivex

РЕДАКТИРОВАТЬ:

избавиться от пакета RX и установить их в build.gradle

implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
...