Моя рабочая среда - JDK 1.6 и Rx Java 2
Я хочу создать Observable, который выдает элемент, который представляет собой строку строки файла, читаемую через BufferedReader следующим образом:
...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
return new ObservableSource<String> call() throws Exception {
return new ObservableSource<String>() {
public void subscribe(Observer<String> observer) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(filePath));
String line = null;
while ((line = reader.readLine()) != null) {
observer.onNext(line);
}
observer.onComplete();
... catching exception and close reader
}
}
}
}
});
Я также хочу создать Observer, который наблюдает за вышеупомянутым Observable с одним оператором take (count) следующим образом:
fileLineObservable.take(2)
.subscribe(new Consumer<String>() {
public void onNext(String line) {
... do something with the file line string
}
});
Я встречаю NullPointerException при выполнении вышеуказанного кода, и я знаю почему. NPE вызван тем, что второй вызов onNext приводит к выполнению onComplete в экземпляре TakeObserver, а внутри метода onComplete вызывается upstream.dispose, который не установлен (null). Переменная восходящего потока TakeObserver должна быть установлена с помощью onSubscribe (одноразового использования), когда он подписывается на Observable.
Как решить эту проблему? Должен ли я реализовать свой собственный класс Disposable для установки восходящего потока TakeObserver?