Выполняйте параллельные задачи с RxJava и ждите, пока последняя из них не будет завершена - PullRequest
0 голосов
/ 28 января 2019

Ну, это мой первый вопрос в StackOverflow, но после нескольких дней борьбы с RxJava Я не могу найти другое решение, я сам много чего пробовал, копался в документации и других постах, но я неуверен, как именно делать то, что мне нужно.Я пробовал с несколькими комбинациями flatMap, zip, merge и другими, но всегда зашел в тупик, и самым близким достигнутым решением является код ниже.Я был бы признателен за любую помощь или руководство.

Мне нужен метод, который при наличии списка ввода выполняет параллельные вызовы с различными входами списка и не продолжает выполнение, пока не завершатся все параллельные вызовы.Также необходимо сохранить результаты различных выполнений для последующего использования ( EDIT : в том же потоке, который начал выполнение).

public void parallelExecution(List<Integer> calls) {
  List<String> results = new ArrayList<>();
  logger.debug("Starting parallel executions");
  Observable.fromIterable(calls)
      .flatMap(val -> Observable.just(val).observeOn(Schedulers.io())
      .doOnNext(item -> results.add(simpleAsync(item).toString())))
      .subscribe(call -> logger.debug("Execution: {}", Thread.currentThread().getName()));
  logger.debug("Ending parallel executions");
  for (String x : results) {
    logger.debug("Results: {}", x);
  }
}

private Integer simpleAsync(Integer number) {
  Integer result = number * number;
  logger.info("Pre log {}: {}", Thread.currentThread().getName(), result);
  try {
    Thread.sleep(number * 500);
  } catch (Exception e) {
  }
  logger.info("Post log {}: {}", Thread.currentThread().getName(), result);
  return result;
}

Проблема заключается в том, что этот код не «ожидает» выполнения метода «simpleAsync», он завершает выполнение без журналов «Results» (результатов пока нет) и послечто следы "Post log" выполняются в разных потоках:

Starting parallel executions
Ending parallel executions
Pre log RxCachedThreadScheduler-1: 1
Pre log RxCachedThreadScheduler-2: 4
Pre log RxCachedThreadScheduler-3: 9
Pre log RxCachedThreadScheduler-4: 16
Pre log RxCachedThreadScheduler-5: 25
Post log RxCachedThreadScheduler-1: 1
Execution: RxCachedThreadScheduler-1
Post log RxCachedThreadScheduler-2: 4
Execution: RxCachedThreadScheduler-2
Post log RxCachedThreadScheduler-3: 9
Execution: RxCachedThreadScheduler-3
Post log RxCachedThreadScheduler-4: 16
Execution: RxCachedThreadScheduler-4
Post log RxCachedThreadScheduler-5: 25
Execution: RxCachedThreadScheduler-5

Если я удаляю предложение "наблюдаю", метод ожидает завершения вызовов, но они выполняются последовательно (вта же тема):

Starting parallel executions
Pre log Default Executor-thread-9: 1
Post log Default Executor-thread-9: 1
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 4
Post log Default Executor-thread-9: 4
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 9
Post log Default Executor-thread-9: 9
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 16
Post log Default Executor-thread-9: 16
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 25
Post log Default Executor-thread-9: 25
Execution: Default Executor-thread-9
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
Results: 25

Ответы [ 2 ]

0 голосов
/ 31 января 2019

Я бы предположил, что вы не мыслите достаточно реактивно:

public Single<List<String>> parallelExecution(List<Integer> calls) {
  return Observable
      .fromIterable(calls)
      .flatMap(val -> Observable.fromCallable(() -> simpleAsync(val).toString())
                                .subscribeOn(Schedulers.io())
      .toList();
}
  • .toList() соберет все результаты и предоставит отдельные элементы, когда flatMap завершит
  • вы хотите использовать subscribeOn, а не observeOn.
  • это было бы проще, если бы simpleAsync вернул реактивный объект.
  • , если вам нужно , чтобы parallelExecution не реагировал, используйте что-то вроде blockingGet.
0 голосов
/ 29 января 2019

Вы пробовали использовать zip ?

public void parallelExecution(List<Integer> calls) {

    logger.debug("Starting parallel executions");

    // Create an iterable observables
    List<Observable<Integer>> observables = calls.stream()
            .map(i -> {
                return Observable.fromCallable(() -> simpleAsync(i))
                        .subscribeOn(Schedulers.newThread());
            })
            .collect(Collectors.toList());


    Observable.zip(observables, objects -> { // Zip observables
                return Arrays.stream(objects)
                        .map(Object::toString)
                        .collect(Collectors.toList());
            })
            .doOnNext(results -> logger.debug("Ending parallel executions"))
            .subscribe(results -> { // Subscribe to the result.
                // Put your code that needs to "wait"
                for (String x : results) {
                    logger.debug("Results: {}", x);
                }
            });
}

Результат будет выглядеть примерно так:

Starting parallel executions
Pre log RxNewThreadScheduler-3: 9
Pre log RxNewThreadScheduler-1: 1
Pre log RxNewThreadScheduler-2: 4
Pre log RxNewThreadScheduler-4: 16
Pre log RxNewThreadScheduler-5: 25
Post log RxNewThreadScheduler-1: 1
Post log RxNewThreadScheduler-2: 4
Post log RxNewThreadScheduler-3: 9
Post log RxNewThreadScheduler-4: 16
Post log RxNewThreadScheduler-5: 25
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
Results: 25

РЕДАКТИРОВАТЬ: Вы можете изменить поток, который вы хотите прослушать результат, используя observeOn,Например, если вы хотите подписаться на вызывающий поток, вы можете изменить код на что-то вроде этого (см. эти SO ответы):

final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
logger.debug("Starting parallel executions");

// Create an iterable observables
List<Observable<Integer>> observables = calls.stream()
        .map(i -> {
            return Observable.fromCallable(() -> simpleAsync(i))
                    .subscribeOn(Schedulers.newThread());
        })
        .collect(Collectors.toList());


Observable.zip(observables, objects -> { // Zip observables
            return Arrays.stream(objects)
                    .map(Object::toString)
                    .collect(Collectors.toList());
        })
        .doOnNext(results -> logger.debug("Ending parallel executions"))
        .observeOn(Schedulers.from(tasks::add)) // Add a scheduler with executor from the current thread
        .subscribe(results -> { // Subscribe to the result.
            // Put your code that needs to "wait"
            for (String x : results) {
                logger.debug("Results: {}", x);
            }
        });

try {
    tasks.take().run();
} catch (InterruptedException e) {
    e.printStackTrace();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...