Rx Java Проблема с чтением файла с помощью оператора Observable и take - PullRequest
0 голосов
/ 06 мая 2020

Моя рабочая среда - JDK 1.6 и Rx Java 2

Я хочу создать Observable, который выдает элемент, который представляет собой строку строки файла, читаемую через BufferedReader следующим образом:

...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
    return new ObservableSource<String> call() throws Exception {
        return new ObservableSource<String>() {
            public void subscribe(Observer<String> observer) {
                BufferedReader reader = null;
                try {
                    reader = new BufferedReader(new FileReader(filePath));
                    String line = null;
                    while ((line = reader.readLine()) != null) {
                        observer.onNext(line);
                    }
                    observer.onComplete();

                    ... catching exception and close reader
                }
            }
        }
    }
});

Я также хочу создать Observer, который наблюдает за вышеупомянутым Observable с одним оператором take (count) следующим образом:

fileLineObservable.take(2)
                  .subscribe(new Consumer<String>() {
                       public void onNext(String line) {
                           ... do something with the file line string
                       }
                  });

Я встречаю NullPointerException при выполнении вышеуказанного кода, и я знаю почему. NPE вызван тем, что второй вызов onNext приводит к выполнению onComplete в экземпляре TakeObserver, а внутри метода onComplete вызывается upstream.dispose, который не установлен (null). Переменная восходящего потока TakeObserver должна быть установлена ​​с помощью onSubscribe (одноразового использования), когда он подписывается на Observable.

Как решить эту проблему? Должен ли я реализовать свой собственный класс Disposable для установки восходящего потока TakeObserver?

1 Ответ

0 голосов
/ 10 мая 2020

А что насчет этого решения?

Observable<String> observableFile2(Path path) {
        return Observable.using(
                () -> Files.newBufferedReader(path),
                reader -> {
                    return Observable.fromIterable(() -> {
                        return new Iterator<>() {
                            private String nextLine = null;

                            @Override
                            public boolean hasNext() {
                                try {
                                    nextLine = reader.readLine();
                                    return nextLine != null;
                                } catch (Exception ex) {
                                    return false;
                                }
                            }

                            @Override
                            public String next() {
                                if (nextLine != null) {
                                    return nextLine;
                                }
                                throw new IllegalStateException("nextLine can not be null.");
                            }
                        };
                    });
                },
                BufferedReader::close
        );
    }
  • Observable # using гарантирует, что BufferedReader правильно закрыт при одноразовом / onError
  • Observable # fromIterable обертывает вызовы readLine и обрабатывает onComplete для нас.

Тестирование

testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.6.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.6.2")
testImplementation("com.google.jimfs:jimfs:1.1")

Тесты

@Test
void name() {
    observableFile2(hello).take(2)
            .test()
            .assertValues("line0", "line1")
            .assertComplete();
}

@Test
void name2() {
    observableFile2(hello).take(10)
            .test()
            .assertValues("line0", "line1", "line2", "line3")
            .assertComplete();
}

@Test
void name3() {
    observableFile2(hello2)
            .test()
            .assertComplete();
}
...