Асинхронный forEach над массивом с помощью RxJava - PullRequest
1 голос
/ 19 апреля 2019

Я пытаюсь перебрать массив карт и выполнить некоторые асинхронные действия. Я попробовал несколько вещей, используя библиотеку RxJava, но все, что я пробовал, кажется синхронным. Я пытаюсь избежать создания новых потоков вручную и хочу, чтобы RxJava обрабатывал это. Это то, что я пробовал до сих пор.

Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread()) 
            .observeOn(Schedulers.computation())
            .forEach(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

    Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.computation())
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

    Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread())
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

    Observable.from(new Map[20])
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

Когда я запускаю модульные тесты с кодом выше, я вижу следующий вывод.

1
2
1
2
1
2
...

То, что я хочу увидеть, это

1
1
1
... 
2
2
2

Как выполнить асинхронную итерацию по массиву Map с использованием RxJava?

Ответы [ 2 ]

2 голосов
/ 19 апреля 2019

Вы можете изменить его с наблюдаемого на текучее и использовать параллельное:

        Flowable.fromIterable(array)
                .parallel(3) // number of items in parallel
                .runOn(Schedulers.newThread()) // the desired scheduler
                .map(item -> {
                    try {
                        System.out.println(1);
                        Thread.sleep(3000);
                        System.out.println(2);
                    } catch (Exception e) {

                    }

                    return Completable.complete();
                })
        .sequential().subscribe();
1 голос
/ 19 апреля 2019

Если вы застряли в RxJava 1.x, у вас не будет доступа к классу Flowable. Это был не мой случай, но что-то вроде приведенного ниже кода может выполнять параллельные действия. Есть больше вложений, но это работает.

    final ExecutorService executor = Executors.newFixedThreadPool(2);
    List<String> iterableList = new ArrayList<>();
    iterableList.add("one");
    iterableList.add("two");
    iterableList.add("three");
    iterableList.add("4");
    iterableList.add("5");
    iterableList.add("6");
    iterableList.add("7");
    iterableList.add("8");
    iterableList.add("9");
    Observable.from(iterableList)
            .flatMap(val -> Observable.just(val)
                    .subscribeOn(Schedulers.from(executor))
                    .doOnNext(numString -> {
                        try {
                            System.out.println(1);
                            Thread.sleep(500);
                            System.out.println(2);
                        } catch (Exception ex) {
                        }
                    })
            )
            .subscribe();
...