операторы задержки и интервала не работают должным образом - PullRequest
0 голосов
/ 02 ноября 2018

Как показано ниже, я создаю Observables. Я хотел бы подождать определенное количество времени в секундах, как показано в коде. Поэтому я использовал либо задержку или интервал оператор. Я ожидал, что код будет ждать 5 секунд, а затем System.out.println от наблюдателя, который будет напечатан.

но что происходит, doOnNext выполняется и код никогда не идет дальше. Я имею в виду, что выполнение останавливается на doOnNext даже по истечении 5 секунд.

Код

public static void main(String[] args) {
    Observable<List<Person>> observables = Observable.create(e-> {
        for(List<Person> p : Main.getPersons()) {
            e.onNext(p);
        }
        e.onComplete();
    });
     observables
    //.subscribeOn(Schedulers.newThread())//newThread
    .flatMap(p->Main.toObservable(p.get(0).getName()))
    .doOnNext(p-> System.out.println(p.length()) )
    .map(p->p+"..STRING")
    //.delay(5, TimeUnit.SECONDS)
    //.interval(0, 5, TimeUnit.SECONDS)
    .observeOn(Schedulers.io())
    .subscribe(new Observer() {
        @Override
        public void onComplete() {
            // TODO Auto-generated method stub
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void onNext(Object arg0) {
            // TODO Auto-generated method stub
            System.out.println("onNextFromObserver: " + arg0);
        }

        @Override
        public void onSubscribe(Disposable arg0) {
            // TODO Auto-generated method stub
        }
    });
}

private static <T> Observable<T> toObservable(T s) {
    return Observable.just(s);
}

private static List<List<Person>> getPersons() {
    return Arrays.asList(
            Arrays.asList(new Person("Sanna1", 59, "EGY"), new Person("Sanna2", 59, "EGY"), new Person("Sanna3", 59, "EGY")),
            Arrays.asList(new Person("Mohamed1", 59, "EGY"), new Person("Mohamed2", 59, "EGY")),
            Arrays.asList(new Person("Ahmed1", 44, "QTR"), new Person("Ahmed2", 44, "QTR"), new Person("Ahmed3", 44, "QTR")),
                    Arrays.asList(new Person("Fatma", 29, "KSA")),
                    Arrays.asList(new Person("Lobna", 24, "EGY"))
                    );
}

1 Ответ

0 голосов
/ 04 ноября 2018

Вы должны ждать в основном методе. Поместите Thread.sleep(10000) в самый конец метода main(), чтобы у Observable был шанс запустить. Потоки RxJava - это потоки демонов, которые останавливаются, когда поток приложения выпадает из метода main().

public static void main(String[] args) {

    Observable.just("Hello World!", "Keep printing values!")
    .zipWith(Observable.interval(0, 5, TimeUnit.SECONDS), (a, b) -> a)
    .subscribe(v -> 
        System.out.println(Thread.currentThread() + ": " + v)
    );

    Thread.sleep(10000);  // <-----------------------------------

}
...