Существует несколько способов, которые можно рассматривать как удобный способ создания исключений:
Для обработки вашего элемента используйте Flux/Mono.handle
Один из способов, который может упростить обработкуЭлемент, который может привести к ошибке или пустому потоку, является оператором handle
.
. Следующий код показывает, как мы можем использовать его для решения нашей проблемы:
Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})
, как мы можемвидите, оператору .handle
требуется передать BiConsumer<T, SynchronousSink<>
для обработки элемента.Здесь мы должны параметры в нашем BiConsumer.Первый - это элемент из вышестоящего потока, второй - SynchronousSink
, который помогает нам синхронно поставлять элемент в нисходящий поток.Такая техника расширяет возможности предоставления различных результатов обработки нашего элемента.Например, если элемент недействителен, мы можем передать ошибку в тот же SycnchronousSync
, который отменит восходящий поток и выдаст сигнал onError
в нисходящий поток.В свою очередь, мы можем «фильтровать», используя тот же оператор handle
.Как только дескриптор BiConsumer
будет выполнен и элемент не будет предоставлен, Reactor сочтет это своего рода фильтрацией и запросит для нас дополнительный элемент.Наконец, в случае, если элемент действителен, мы можем просто вызвать SynchronousSink#next
и распространить наш элемент вниз по потоку или применить к нему некоторое отображение, поэтому мы будем иметь handle
в качестве оператора map
здесь.Более того, мы можем безопасно использовать этот оператор без влияния на производительность и обеспечить сложную проверку элемента, такую как проверка элемента или отправка ошибки в нисходящий поток.
Броски с использованием #concatMap
+ Mono.error
Один из вариантов выброса исключения во время отображения - заменить map
на concatMap
.По своей сути concatMap
делает почти то же самое, что flatMap
.Разница лишь в том, что concatMap
допускает только один подпоток за раз.Такое поведение значительно упрощает внутреннюю реализацию и не влияет на производительность.Таким образом, мы можем использовать следующий код, чтобы вызвать исключение более функциональным способом:
Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})
В приведенном выше примере для недопустимого пользователя мы возвращаем исключение, используя Mono.error
.То же самое мы можем сделать для потока, используя Flux.error
:
Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})
Примечание , в обоих случаях мы возвращаем cold stream, который имеет только один элемент.В Reactor есть несколько оптимизаций, которые повышают производительность в случае, если возвращаемый поток является холодным скалярным потоком.Таким образом, рекомендуется использовать Flux / Mono concatMap
+ .just
, empty
, error
в результате, когда нам нужно более сложное отображение, которое может закончиться return null
или throw new ...
.
Внимание!Никогда не проверяйте входящий элемент на обнуляемость.Проект Reactor никогда не отправит вам значение null
, поскольку эта спецификация Reactive Streams насилия (см. Правило 2.13 ) Таким образом, в случае, если repo.findById
вернет ноль, Reactor выдаст исключение NPE дляВы.
Подождите, почему concatMap
лучше, чем flatMap
?
По своей сути, flatMap
предназначен для объединения элементов из нескольких подпотоков, которые выполняются одновременно.Это означает, что в flatMap должны быть асинхронные потоки, поэтому они могут обрабатывать данные в нескольких потоках или это могут быть несколько сетевых вызовов.Впоследствии такие ожидания сильно влияют на реализацию, поэтому flatMap
должен иметь возможность обрабатывать данные из нескольких потоков (Thread
s) (означает использование параллельных структур данных), ставить элементы в очередь, если происходит слив из другого потока (означает дополнительныевыделение памяти на Queue
s для каждого подпотока) и не нарушать правила спецификации Reactive Streams (означает действительно сложную реализацию).Считая все эти факты и тот факт, что мы заменяем простую операцию map
(которая является синхронной) на более удобный способ генерирования исключения с использованием Flux/Mono.error
(который не изменяет синхронность выполнения), приводит к тому, что мы делаемнам не нужен такой сложный оператор, и мы можем использовать гораздо более простой concatMap
, который предназначен для асинхронной обработки одного потока за раз и имеет пару оптимизаций для обработки скалярного, холодного потока.
Выдает исключение, используя switchOnEmpty
Итак, еще один подход к созданию исключения, когда результат пуст, - это оператор switchOnEmpty
.Следующий код демонстрирует, как мы можем использовать этот подход:
Mono.just(userId)
.flatMap(repo::findById)
.switchIfEmpty(Mono.error(new UserNotFoundExeception()))
Как мы видим, в этом случае repo::findById
должно иметь Mono
из User
в качестве возвращаемого типа.Поэтому, если экземпляр User
не будет найден, поток результатов будет пустым.Таким образом, Reactor вызовет альтернативный Mono
, указанный как switchIfEmpty
параметр.
Бросьте ваше исключение как
Это может быть расценено как менее читаемый код или плохая практика, но выможет бросить ваше исключение как есть.Этот шаблон нарушает спецификацию Reactive Streams, но реактор поймает выброшенное для вас исключение и передаст его в виде сигнала onError
вашему нисходящему потоку
Takeaways
- Используйте оператор
.handle
в порядкечтобы обеспечить сложную обработку элементов - Используйте
concatMap
+ Mono.error
, когда нам нужно вызвать исключение во время отображения, но такой метод наиболее подходит для случаев асинхронной обработки элементов. - Использование
flatMap
+ Mono.error
когда у нас уже есть flatMap
на месте Null
, так как тип возврата запрещен, поэтому вместо null
в нисходящем потоке map
вы получите неожиданное onError
с NullPointerException
- Используйте
switchIfEmpty
во всех случаях, когда вам нужно отправить сигнал об ошибке, если результат вызова какой-то конкретной функции завершился с empty stream