подходит ли такой код для реактивного программирования?
ИМХО, сопрограммы Kotlin намного лучше подходят для этого варианта использования, и в результате получается намного более чистый код.
Однако вы можете делать это в реактивных потоках.
Является ли хорошей идеей просто заменить каждое CompletableFuture на Mono?
Я обнаружил, что реактивные потоки очень хорошо справляются со многими асинхронными сценариями использования (например, примеры из проектного реактора ). Тем не менее, есть определенные варианты использования, которые не совсем подходят. Поэтому я не могу рекомендовать политику замены каждые CompletableFuture
реактивными потоками.
Однако, один случай, для которого вы должны переключиться с CompletableFuture
, это когда вам нужно противодавление.
Решение о том, какой асинхронный шаблон использовать, зависит от того, какие языки / рамки / инструменты / библиотеки вы используете, и насколько вам и вашим товарищам по команде комфортно с ними. Если вы используете библиотеки с хорошей поддержкой Kotlin, и ваша команда знакома с Kotlin, то используйте сопрограммы. Аналогично для реактивных потоков.
с учетом Flux<String> idFlux
, как провести рефакторинг с помощью Reactor 3?
Вот некоторые вещи, которые следует иметь в виду при рассмотрении реактивных потоков для этого варианта использования:
- Реактивные потоки не могут излучать
null
. Вместо этого обычно используется пустой Mono
. (Технически вы также можете использовать Mono<Optional<...>>
, но в этот момент вы просто повреждаете свой мозг и просите об ошибках)
- Когда
Mono
пусто, лямбда, передаваемая любому оператору, который имеет дело с сигналом onNext
(например, .map
, .flatMap
, .handle
и т. Д.), не вызывается . Помните, что вы имеете дело с потоком данных (а не с потоком обязательного управления)
- Операторы
.switchIfEmpty
или .defaultIfEmpty
могут работать при пустых Mono
с. Однако они не обеспечивают условия else
. Нижестоящие операторы не знают, что поток ранее был пустым (если элемент, излучаемый издателем, переданный в .switchIfEmpty
, не может быть легко идентифицирован)
- Если у вас есть поток из множества операторов, и из-за нескольких операторов поток может стать пустым, то для последующих операторов трудно / невозможно определить , почему поток стал пустым.
- Основными асинхронными операторами, которые позволяют обрабатывать передаваемые значения от вышестоящих операторов, являются
.flatMap
, .flatMapSequential
и .concatMap
. Вам нужно будет использовать их для объединения асинхронных операций, которые работают с выходными данными предыдущих асинхронных операций.
- Поскольку ваш вариант использования не возвращает значение, реализация реактивного потока вернет
Mono<Void>
Сказав все это, вот попытка преобразовать ваш пример в реактор 3 (с некоторыми оговорками):
Mono<Void> updateFoos(Flux<String> idFlux) {
return idFlux // Flux<String>
.flatMap(id -> getFoo(id) // Mono<Foo>
/*
* If a Foo with the given id is not found,
* create a new one, and continue the stream with it.
*/
.switchIfEmpty(insertFoo(id, new Foo())) // Mono<Foo>
/*
* Note that this is not an "else" condition
* to the above .switchIfEmpty
*
* The lambda passed to .flatMap will be
* executed with either:
* A) The foo found from getFoo
* OR
* B) the newly inserted Foo from insertFoo
*/
.flatMap(foo -> getBar(foo.bar) // Mono<Bar>
.flatMap(bar -> updateBar(foo, bar)) // Mono<Bar>
.then() // Mono<Void>
) // Mono<Void>
) // Flux<Void>
.then(); // Mono<Void>
}
/*
* @return the Foo with the given id, or empty if not found
*/
abstract Mono<Foo> getFoo(String id);
/*
* @return the Bar with the given id, or empty if not found
*/
abstract Mono<Bar> getBar(String id);
/*
* @return the Foo inserted, never empty
*/
abstract Mono<Foo> insertFoo(String id, Foo foo);
/*
* @return the Bar updated, never empty
*/
abstract Mono<Bar> updateBar(Foo foo, Bar bar);
и вот более сложный пример, который использует Tuple2<Foo,Boolean>
, чтобы указать, был ли найден оригинальный Foo (это должно быть семантически эквивалентно вашему примеру):
Mono<Void> updateFoos(Flux<String> idFlux) {
return idFlux // Flux<String>
.flatMap(id -> getFoo(id) // Mono<Foo>
/*
* Map to a Tuple2 whose t2 indicates whether the foo was found.
* In this case, it was found.
*/
.map(foo -> Tuples.of(foo, true)) // Mono<Tuple2<Foo,Boolean>>
/*
* If a Foo with the given id is not found,
* create a new one, and continue the stream with
* a Tuple2 indicating it wasn't originally found
*/
.switchIfEmpty(insertFoo(id, new Foo()) // Mono<Foo>
/*
* Foo was not originally found, so t2=false
*/
.map(foo -> Tuples.of(foo, false))) // Mono<Tuple2<Foo,Boolean>>
/*
* The lambda passed to .flatMap will be
* executed with either:
* A) t1=foo found from getFoo, t2=true
* OR
* B) t1=newly inserted Foo from insertFoo, t2=false
*/
.flatMap(tuple2 -> tuple2.getT2()
// foo originally found
? getBar(tuple2.getT1().bar) // Mono<Bar>
.flatMap(bar -> updateBar(tuple2.getT1(), bar)) // Mono<Bar>
.then() // Mono<Void>
// foo originally not found (new inserted)
: Mono.empty() // Mono<Void>
)
) // Flux<Void>
.then(); // Mono<Void>
}