Ну, это мой первый вопрос в StackOverflow, но после нескольких дней борьбы с RxJava
Я не могу найти другое решение, я сам много чего пробовал, копался в документации и других постах, но я неуверен, как именно делать то, что мне нужно.Я пробовал с несколькими комбинациями flatMap
, zip
, merge
и другими, но всегда зашел в тупик, и самым близким достигнутым решением является код ниже.Я был бы признателен за любую помощь или руководство.
Мне нужен метод, который при наличии списка ввода выполняет параллельные вызовы с различными входами списка и не продолжает выполнение, пока не завершатся все параллельные вызовы.Также необходимо сохранить результаты различных выполнений для последующего использования ( EDIT : в том же потоке, который начал выполнение).
public void parallelExecution(List<Integer> calls) {
List<String> results = new ArrayList<>();
logger.debug("Starting parallel executions");
Observable.fromIterable(calls)
.flatMap(val -> Observable.just(val).observeOn(Schedulers.io())
.doOnNext(item -> results.add(simpleAsync(item).toString())))
.subscribe(call -> logger.debug("Execution: {}", Thread.currentThread().getName()));
logger.debug("Ending parallel executions");
for (String x : results) {
logger.debug("Results: {}", x);
}
}
private Integer simpleAsync(Integer number) {
Integer result = number * number;
logger.info("Pre log {}: {}", Thread.currentThread().getName(), result);
try {
Thread.sleep(number * 500);
} catch (Exception e) {
}
logger.info("Post log {}: {}", Thread.currentThread().getName(), result);
return result;
}
Проблема заключается в том, что этот код не «ожидает» выполнения метода «simpleAsync», он завершает выполнение без журналов «Results» (результатов пока нет) и послечто следы "Post log" выполняются в разных потоках:
Starting parallel executions
Ending parallel executions
Pre log RxCachedThreadScheduler-1: 1
Pre log RxCachedThreadScheduler-2: 4
Pre log RxCachedThreadScheduler-3: 9
Pre log RxCachedThreadScheduler-4: 16
Pre log RxCachedThreadScheduler-5: 25
Post log RxCachedThreadScheduler-1: 1
Execution: RxCachedThreadScheduler-1
Post log RxCachedThreadScheduler-2: 4
Execution: RxCachedThreadScheduler-2
Post log RxCachedThreadScheduler-3: 9
Execution: RxCachedThreadScheduler-3
Post log RxCachedThreadScheduler-4: 16
Execution: RxCachedThreadScheduler-4
Post log RxCachedThreadScheduler-5: 25
Execution: RxCachedThreadScheduler-5
Если я удаляю предложение "наблюдаю", метод ожидает завершения вызовов, но они выполняются последовательно (вта же тема):
Starting parallel executions
Pre log Default Executor-thread-9: 1
Post log Default Executor-thread-9: 1
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 4
Post log Default Executor-thread-9: 4
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 9
Post log Default Executor-thread-9: 9
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 16
Post log Default Executor-thread-9: 16
Execution: Default Executor-thread-9
Pre log Default Executor-thread-9: 25
Post log Default Executor-thread-9: 25
Execution: Default Executor-thread-9
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
Results: 25