Как разорвать связь между Observables и ovserver до того, как все элементы будут использованы - PullRequest
0 голосов
/ 06 ноября 2018

Я хотел бы знать, как я могу разорвать связь между наблюдаемыми и наблюдателем раньше, чем все обсевабы будут израсходованы. Я знаю, что это можно сделать с помощью одноразовых ... но как я могу получить ссылку на одноразовый предмет, как в приведенном ниже примере заранее спасибо

код

Observable<List<List<Person>>> observables = Observable.just(Main.getPersons());
    observables
    .concatMap(ll->{
        //how to display the size of List<List<Person>>
        //System.out.println("ll: " + ll.size());
        return Observable.fromIterable(ll)
                .concatMap(l->Observable.fromIterable(l))
                .filter(p->p.getAge().orElse(-1) <44)
                .map(g->g.getName().map(s->s+"_test").get()+ "  " + g.getAge().orElse(0));
    }
            )
    //.subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .blockingSubscribe(new Observer() {

        @Override
        public void onComplete() {
            // TODO Auto-generated method stub

        }

        @Override
        public void onError(Throwable arg0) {
            // TODO Auto-generated method stub
            System.out.println("onError: " + arg0);
        }

        @Override
        public void onNext(Object arg0) {
            // TODO Auto-generated method stub
            System.out.println("onNext: " + arg0);
        }

        @Override
        public void onSubscribe(Disposable arg0) {
            // TODO Auto-generated method stub

        }
    });
...