Дождитесь завершения нескольких наблюдаемых RxJava - PullRequest
0 голосов
/ 26 апреля 2018

Я использую RxJava в веб-сервисе для создания нескольких наблюдаемых, каждая из которых делает что-то во время своих функций onNext и onError.Я хочу подождать, пока все эти наблюдаемые не завершат свою работу, прежде чем возвращать ответ клиенту, но я также хочу, чтобы эти задачи выполнялись параллельно

Я объединил все из них, используя:

Observable<List<Obj>> mergedObs = Observable.mergeDelayError(tasks)

Затем я конвертирую это в блокировку, наблюдаемую с помощью

mergedObs.toList().toBlocking.subscribe(...)

Однако я заметил, что хотя объединенная наблюдаемая ждет, пока все задачи вызовут метод subscribe ()он не ожидает выполнения задач в функции подписки.Вот некоторый пример кода с пропущенными несущественными деталями:

public void doWork() {
    Observable<Obj> mergedTasks = getTasks();
    mergedTasks.toList().toBlocking().subscribe(
        results -> LOG.info("Done!)
    );

    return; // eventually returns a web response
}

private Observable<Obj> getTasks() {
    List<Observable<Obj>> tasks = new ArrayList<>();
    Observable<Obj> task1 = getTask1();
    Observable<Obj> task2 = getTask2();
    tasks.add(task1);
    tasks.add(task2);

    // Both tasks execute in parallel
    tasks.toList().subscribe(
        result -> processSearch(), // this can execute after the service returns a response!
        exception -> handleException() // this can execute after the service returns a response!
    );

    return Observable.mergeDelayError(tasks);
}

private Observable<Obj> getTask1() {
    // implementation detail
}

private Observable<Obj> getTask2() {
    // implementation detail
}

Хотя этот код ожидает, пока обе задачи вызовут метод subscribe (), он не ожидает работу, выполненную в onSuccess.или onError.

...