Пользовательский RxJava Observable ничего не генерирует при подписке - PullRequest
0 голосов
/ 24 декабря 2018

Я получил этот метод от здесь , и он хорошо работает:

@Throws(IOException::class)
fun readTextFromUri(ctx: Context, uri: Uri): String {
    val stringBuilder = StringBuilder()
    ctx.contentResolver.openInputStream(uri)?.use { inputStream ->
        BufferedReader(InputStreamReader(inputStream)).use { reader ->
            var line: String? = reader.readLine()
            while (line != null) {
                stringBuilder.append("$line\n")
                line = reader.readLine()
            }
        }
    }
    return stringBuilder.toString()
}

Затем преобразовал его в этот метод, который возвращает каждую строку, используя Observable:

fun getUriAsStringObservable(ctx: Context, uri: Uri): Observable<String> {
    return Observable.create {
        try {
            ctx.contentResolver.openInputStream(uri)?.use { inputStream ->
                BufferedReader(InputStreamReader(inputStream)).use { reader ->
                    var line: String? = reader.readLine()
                    while (line != null) {
                        it.onNext(line)
                        line = reader.readLine()
                    }
                    it.onComplete()
                }
            }
        } catch (e: IOException) {
            it.onError(e)
        }
    }
}

Но это не сработало, как я ожидал, после подписки на него ничего не печаталось:

getUriAsStringObservable(this, uri)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext {
        print("Next: $it")
    }
    .doOnError {
        print("Error: $it")
    }
    .doOnComplete {
        print("completed")
    }
    .subscribe()

В чем моя ошибка?

1 Ответ

0 голосов
/ 27 декабря 2018

Я нашел три способа решения моей проблемы:

1) Используйте Thread.sleep(1) после передачи каждого элемента.

2) Используйте пользовательский планировщик, который имеет не deamon поток ссылка .

3) Используйте Flowable с BackpressureStrategy.BUFFER вместо Observable (Лучший способ).

Flowable.create({
    try {
        ctx.contentResolver.openInputStream(uri)?.use { inputStream ->
            BufferedReader(InputStreamReader(inputStream)).use { reader ->
                var line: String? = reader.readLine()
                while (line != null) {
                    it.onNext(line)
                    line = reader.readLine()
                }
                it.onComplete()
            }
        }
    } catch (e: IOException) {
        it.onError(e)
    }
}, BackpressureStrategy.BUFFER)

Спасибо, Джавид

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...