Я использую RxJava в веб-сервисе для создания нескольких наблюдаемых, каждая из которых делает что-то во время своих функций onNext и onError.Я хочу подождать, пока все эти наблюдаемые не завершат свою работу, прежде чем возвращать ответ клиенту, но я также хочу, чтобы эти задачи выполнялись параллельно
Я объединил все из них, используя:
Observable<List<Obj>> mergedObs = Observable.mergeDelayError(tasks)
Затем я конвертирую это в блокировку, наблюдаемую с помощью
mergedObs.toList().toBlocking.subscribe(...)
Однако я заметил, что хотя объединенная наблюдаемая ждет, пока все задачи вызовут метод subscribe ()он не ожидает выполнения задач в функции подписки.Вот некоторый пример кода с пропущенными несущественными деталями:
public void doWork() {
Observable<Obj> mergedTasks = getTasks();
mergedTasks.toList().toBlocking().subscribe(
results -> LOG.info("Done!)
);
return; // eventually returns a web response
}
private Observable<Obj> getTasks() {
List<Observable<Obj>> tasks = new ArrayList<>();
Observable<Obj> task1 = getTask1();
Observable<Obj> task2 = getTask2();
tasks.add(task1);
tasks.add(task2);
// Both tasks execute in parallel
tasks.toList().subscribe(
result -> processSearch(), // this can execute after the service returns a response!
exception -> handleException() // this can execute after the service returns a response!
);
return Observable.mergeDelayError(tasks);
}
private Observable<Obj> getTask1() {
// implementation detail
}
private Observable<Obj> getTask2() {
// implementation detail
}
Хотя этот код ожидает, пока обе задачи вызовут метод subscribe (), он не ожидает работу, выполненную в onSuccess.или onError.