Эквивалент VertX CompositeFuture в RxJava - PullRequest
0 голосов
/ 03 мая 2018

Пример VertX, когда вам нужно запросить несколько асинхронных ресурсов и использовать их все в одной операции:

Future<HttpServer> httpServerFuture = Future.future();
httpServer.listen(httpServerFuture.completer());

Future<NetServer> netServerFuture = Future.future();
netServer.listen(netServerFuture.completer());

CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
  if (ar.succeeded()) {
    // All servers started
  } else {
    // At least one server failed
  }
});

Нам нужно запросить две разные базы данных, а затем использовать результаты в бизнес-логике, но поток эквивалентен.

Что такое эквивалент VertX / RxJava?

В настоящее время люди делают это, вкладывая новый вызов .flatMap () каждый раз, когда им нужна новая переменная. Я чувствую, что должен быть лучший способ ...

На самом деле нам не нужно, чтобы запросы были параллельными, но нам нужно кэшировать оба результата и одновременно передавать их бизнес-логике.

1 Ответ

0 голосов
/ 04 мая 2018

Есть много способов сделать это, но я попытался выбрать подход, который тесно связан с вашим образцом:

@Override
public void start(Future<Void> startFuture) throws Exception {
    final HttpServer httpServer = vertx.createHttpServer();
    final Completable initializeHttpServer = httpServer.rxListen().toCompletable();

    final NetServer netServer = vertx.createNetServer();
    final Completable initializeNetServer = netServer.rxListen().toCompletable();

    initializeHttpServer.andThen(initializeNetServer)
        .subscribe(
            ()    -> { /* All servers started */ },
            error -> { /* At least one server failed */ }
        );
}

вызовы rxListen() преобразуются в Completable экземпляры, которые затем запускаются последовательно по подписке.

  • обратный вызов onComplete абонента будет вызван, когда оба сервера будут привязаны к соответствующим портам, или ...
  • обратный вызов onError будет вызван, если произойдет исключение

(также, fwiw, «вложение» flatMap операций для чего-то столь же тривиального, как это не должно быть необходимым. Однако «связывание» таких операций было бы идиоматическим использованием).

надеюсь, что это поможет!

- ОБНОВЛЕНИЕ -

Внимательно прочитав вопрос, теперь я вижу, что вы на самом деле спрашивали о том, как обрабатывать результаты двух дискретных асинхронных операций.

альтернативой flatMap для комбинирования результатов будет использование оператора zip, например:

    @Override
    public void start(Future<Void> startFuture) throws Exception {
        final Single<String> dbQuery1 = Single.fromCallable(() -> { return "db-query-result-1"; });
        final Single<String> dbQuery2 = Single.fromCallable(() -> { return "db-query-result-2"; });

        Single.zip(dbQuery1, dbQuery2, (result1, result2) -> {
            // handle the results from both db queries
            // (with Pair being a standard tuple-like class)
           return new Pair(result1, result2);
        })
            .subscribe(
                pair -> {
                    // handle the results
                },
                error -> {
                    // something went wrong
                }
            );
    }

в документах , zip позволяет вам указать серию реактивных типов (Single, Observable и т. Д.) Вместе с функцией для преобразования всех результатов одновременно, с Основная идея заключается в том, что он не будет излучать ничего, пока все источники не будут излучать один раз (или больше, в зависимости от типа реактивности).

...