Наблюдаемый сделать IO и следующий поток на графике IO - PullRequest
0 голосов
/ 16 мая 2018

Я видел странное поведение в RxJava со следующим кодом:

package com.hotels.guestreview.infrastructure.repository;

    import java.util.List;
    import java.util.Random;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;

    import rx.Observable;
    import rx.functions.Action1;
    import rx.schedulers.Schedulers;


    import org.apache.commons.lang.RandomStringUtils;
    import rx.Observable;
    import rx.functions.Action1;
    import rx.schedulers.Schedulers;

    import java.util.List;
    import java.util.Random;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;

    public class Main {

        public static void main(String[] args) {
            final Main m = new Main();
            m.run();
        }

        public void run() {
            final List<String> result = Observable.from(new Integer[]{4, 5, 6, 6, 7, 3})
                    .doOnNext(debug("Init"))
                    .flatMap(i -> Observable.defer(() -> toRandomList(i)).subscribeOn(Schedulers.io()))
                    .doOnNext(debug("defer"))
                    .flatMap(this::chooseString)
                    .doOnNext(debug("chooseString"))
                    .toList()
                    .doOnNext(debug("list"))
                    .toBlocking()
                    .single();
            System.out.println("\nresult = " + result);
        }


        public static Observable<List<String>> toRandomList(Integer n) {
            debug("perform IO").call(n);
            try {
                Thread.sleep(new Random().nextInt(3000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            debug("IO done").call(n);
            final List<String> result = Stream.iterate(0, t -> t + 1)
                .map(i -> RandomStringUtils.randomAlphanumeric(n))
                .limit(n)
                .collect(Collectors.toList());
            return Observable.just(result);
        }

        public Observable<String> chooseString(List<String> list) {
            // guilty code
            /*
            try {
                Thread.sleep(new Random().nextInt(3000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            */
            // end guilty code
            if (Math.random() > .3) {
                return Observable.just(list.get(new Random().nextInt(list.size())));
            }
            else {
                return Observable.empty();
            }
        }

        public static <T> Action1<T> debug(String s) {
            return o -> System.out.println(o + " | " + s + " | " + Thread.currentThread().getName());
        }

    }

Я пытаюсь выполнить метод toRandomList в планировщике io , и все прекрасно работает с прокомментированным кодом , каждый выброс которого и последующий поток toRandomList в отдельной теме. Если я удаляю комментарий (добавляя спящий режим) guilty code в методе chooseString, каждый шаг после toRandomList выполняется в том же потоке.
Почему это происходит? Что я делаю не так?

Заранее спасибо

Ответы [ 2 ]

0 голосов
/ 16 мая 2018

Проблема здесь на плоской карте, должна быть изменена как:

Observable.from(new Integer[]{4, 5, 6, 6, 7, 3})
                .doOnNext(debug("Init"))
                .flatMap(i -> Observable.defer(() -> toRandomList(i))
                      .doOnNext(debug("defer"))
                      .flatMap(this::chooseString)
                      .subscribeOn(Schedulers.io())
                )

Таким образом, весь подпоток, определенный внутри flatMap, в котором он называется subscribeOn, выполняется в потоке выбранного Scheduler.

Тогда, как @Dmitry указал в своем ответе, лучше использовать fromCallable вместо комбинации defer и just / empty

0 голосов
/ 16 мая 2018

Это потому, что вы используете Observable.just для создания своего потока внутри toRandomList.Observable.just создает поток из уже вычисленного значения.Но вам нужно выполнить некоторые вычисления перед возвратом значения, поэтому вам нужно использовать другой оператор.Observable.fromCallable например:

public static Observable<List<String>> toRandomList(Integer n) {
    return Observable.fromCallable(() -> {
        debug("perform IO").call(n);
        try {
            Thread.sleep(new Random().nextInt(3000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        debug("IO done").call(n);

        return Stream.iterate(0, t -> t + 1)
                .map(i -> "1")
                .limit(n)
                .collect(Collectors.toList());
    });
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...