Я хотел бы знать, как я могу разорвать связь между наблюдаемыми и наблюдателем раньше, чем все обсевабы будут израсходованы.
Я знаю, что это можно сделать с помощью одноразовых ... но как я могу получить ссылку на одноразовый предмет, как в приведенном ниже примере
заранее спасибо
код
Observable<List<List<Person>>> observables = Observable.just(Main.getPersons());
observables
.concatMap(ll->{
//how to display the size of List<List<Person>>
//System.out.println("ll: " + ll.size());
return Observable.fromIterable(ll)
.concatMap(l->Observable.fromIterable(l))
.filter(p->p.getAge().orElse(-1) <44)
.map(g->g.getName().map(s->s+"_test").get()+ " " + g.getAge().orElse(0));
}
)
//.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.blockingSubscribe(new Observer() {
@Override
public void onComplete() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
System.out.println("onError: " + arg0);
}
@Override
public void onNext(Object arg0) {
// TODO Auto-generated method stub
System.out.println("onNext: " + arg0);
}
@Override
public void onSubscribe(Disposable arg0) {
// TODO Auto-generated method stub
}
});