Как реализовать вложенный асинхронный код с реактивным программированием? - PullRequest
1 голос
/ 26 апреля 2019

Я очень плохо знаком с реактивным программированием. Хотя я хорошо знаком с функциональным программированием и сопрограммами kotlin, я все еще не могу понять, как использовать парадигмы реактивного программирования для рефакторинга простого вложенного кода CRUD, особенно с вложенными асинхронными операциями.

Например, ниже приведен простой фрагмент асинхронного кода CRUD, основанный на Java 8 CompletableFuture


        getFooAsync(id)
                .thenAccept(foo -> {
                    if (foo == null) {
                        insertFooAsync(id, new Foo());
                    } else {
                        getBarAsync(foo.bar)
                                .thenAccept(bar -> {
                                   updateBarAsync(foo, bar);
                                });
                    }
                });

Рефакторинг его с сопрограммами kotlin очень прост, что делает его более читабельным без потери асинхронности.

 val foo = suspendGetFoo(id)
 if(foo==null) {
   suspendInsertFoo(id, Foo())
 } else {
   val bar = suspendGetBar(foo.bar)
   suspendUpdateBar(foo, bar);-
}

Однако подходит ли подобный код для реактивного программирования?

Если это так, учитывая Flux<String> idFlux, как провести рефакторинг с помощью Reactor 3?

Это хорошая идея, чтобы просто заменить каждый CompletableFuture на Mono?

1 Ответ

8 голосов
/ 26 апреля 2019

подходит ли такой код для реактивного программирования?

ИМХО, сопрограммы Kotlin намного лучше подходят для этого варианта использования, и в результате получается намного более чистый код.

Однако вы можете делать это в реактивных потоках.

Является ли хорошей идеей просто заменить каждое CompletableFuture на Mono?

Я обнаружил, что реактивные потоки очень хорошо справляются со многими асинхронными сценариями использования (например, примеры из проектного реактора ). Тем не менее, есть определенные варианты использования, которые не совсем подходят. Поэтому я не могу рекомендовать политику замены каждые CompletableFuture реактивными потоками.

Однако, один случай, для которого вы должны переключиться с CompletableFuture, это когда вам нужно противодавление.

Решение о том, какой асинхронный шаблон использовать, зависит от того, какие языки / рамки / инструменты / библиотеки вы используете, и насколько вам и вашим товарищам по команде комфортно с ними. Если вы используете библиотеки с хорошей поддержкой Kotlin, и ваша команда знакома с Kotlin, то используйте сопрограммы. Аналогично для реактивных потоков.

с учетом Flux<String> idFlux, как провести рефакторинг с помощью Reactor 3?

Вот некоторые вещи, которые следует иметь в виду при рассмотрении реактивных потоков для этого варианта использования:

  1. Реактивные потоки не могут излучать null. Вместо этого обычно используется пустой Mono. (Технически вы также можете использовать Mono<Optional<...>>, но в этот момент вы просто повреждаете свой мозг и просите об ошибках)
  2. Когда Mono пусто, лямбда, передаваемая любому оператору, который имеет дело с сигналом onNext (например, .map, .flatMap, .handle и т. Д.), не вызывается . Помните, что вы имеете дело с потоком данных (а не с потоком обязательного управления)
  3. Операторы .switchIfEmpty или .defaultIfEmpty могут работать при пустых Mono с. Однако они не обеспечивают условия else. Нижестоящие операторы не знают, что поток ранее был пустым (если элемент, излучаемый издателем, переданный в .switchIfEmpty, не может быть легко идентифицирован)
  4. Если у вас есть поток из множества операторов, и из-за нескольких операторов поток может стать пустым, то для последующих операторов трудно / невозможно определить , почему поток стал пустым.
  5. Основными асинхронными операторами, которые позволяют обрабатывать передаваемые значения от вышестоящих операторов, являются .flatMap, .flatMapSequential и .concatMap. Вам нужно будет использовать их для объединения асинхронных операций, которые работают с выходными данными предыдущих асинхронных операций.
  6. Поскольку ваш вариант использования не возвращает значение, реализация реактивного потока вернет Mono<Void>

Сказав все это, вот попытка преобразовать ваш пример в реактор 3 (с некоторыми оговорками):

    Mono<Void> updateFoos(Flux<String> idFlux) {
        return idFlux                                         // Flux<String>
            .flatMap(id -> getFoo(id)                         // Mono<Foo>
                /*
                 * If a Foo with the given id is not found,
                 * create a new one, and continue the stream with it.
                 */
                .switchIfEmpty(insertFoo(id, new Foo()))      // Mono<Foo>
                /*
                 * Note that this is not an "else" condition
                 * to the above .switchIfEmpty
                 *
                 * The lambda passed to .flatMap will be
                 * executed with either:
                 * A) The foo found from getFoo
                 *    OR
                 * B) the newly inserted Foo from insertFoo
                 */
                .flatMap(foo -> getBar(foo.bar)               // Mono<Bar>
                    .flatMap(bar -> updateBar(foo, bar))      // Mono<Bar>
                    .then()                                   // Mono<Void>
                )                                             // Mono<Void>
            )                                                 // Flux<Void>
            .then();                                          // Mono<Void>
    }

    /*
     * @return the Foo with the given id, or empty if not found
     */
    abstract Mono<Foo> getFoo(String id);

    /*
     * @return the Bar with the given id, or empty if not found
     */
    abstract Mono<Bar> getBar(String id);

    /*
     * @return the Foo inserted, never empty
     */
    abstract Mono<Foo> insertFoo(String id, Foo foo);

    /*
     * @return the Bar updated, never empty
     */
    abstract Mono<Bar> updateBar(Foo foo, Bar bar);

и вот более сложный пример, который использует Tuple2<Foo,Boolean>, чтобы указать, был ли найден оригинальный Foo (это должно быть семантически эквивалентно вашему примеру):

    Mono<Void> updateFoos(Flux<String> idFlux) {
        return idFlux                                         // Flux<String>
            .flatMap(id -> getFoo(id)                         // Mono<Foo>
                /*
                 * Map to a Tuple2 whose t2 indicates whether the foo was found.
                 * In this case, it was found.
                 */
                .map(foo -> Tuples.of(foo, true))             // Mono<Tuple2<Foo,Boolean>>
                /*
                 * If a Foo with the given id is not found,
                 * create a new one, and continue the stream with 
                 * a Tuple2 indicating it wasn't originally found
                 */
                .switchIfEmpty(insertFoo(id, new Foo())       // Mono<Foo>
                    /*
                     * Foo was not originally found, so t2=false
                     */
                    .map(foo -> Tuples.of(foo, false)))       // Mono<Tuple2<Foo,Boolean>>
                /*
                 * The lambda passed to .flatMap will be
                 * executed with either:
                 * A) t1=foo found from getFoo, t2=true
                 *    OR
                 * B) t1=newly inserted Foo from insertFoo, t2=false
                 */
                .flatMap(tuple2 -> tuple2.getT2()
                    // foo originally found 
                    ? getBar(tuple2.getT1().bar)              // Mono<Bar>
                        .flatMap(bar -> updateBar(tuple2.getT1(), bar)) // Mono<Bar>
                        .then()                               // Mono<Void>
                    // foo originally not found (new inserted)
                    : Mono.empty()                            // Mono<Void>
                )
            )                                                 // Flux<Void>
            .then();                                          // Mono<Void>
    }

...