Правильный способ создания исключений с помощью Reactor - PullRequest
0 голосов
/ 03 декабря 2018

Я новичок в проекте Reactor и реактивном программировании в целом.

В настоящее время я работаю над фрагментом кода, похожим на этот:

Mono.just(userId)
    .map(repo::findById)
    .map(user-> {
        if(user == null){
            throw new UserNotFoundException();
        }
        return user;
    })
    // ... other mappings

Этот пример, вероятно, глуп, и, конечно, есть более эффективные способы реализации этого случая, но суть в следующем:

Неправильно ли использовать исключение throw new в блоке map, или я должен заменить его наa return Mono.error(new UserNotFoundException())?

Есть ли реальная разница в этих двух способах ведения?

1 Ответ

0 голосов
/ 03 декабря 2018

Существует несколько способов, которые можно рассматривать как удобный способ создания исключений:

Для обработки вашего элемента используйте Flux/Mono.handle

Один из способов, который может упростить обработкуЭлемент, который может привести к ошибке или пустому потоку, является оператором handle.

. Следующий код показывает, как мы можем использовать его для решения нашей проблемы:

Mono.just(userId)
    .map(repo::findById)
    .handle((user, sink) -> {
        if(!isValid(user)){
            sink.error(new InvalidUserException());
        } else if (isSendable(user))
            sink.next(user);
        }
        else {
            //just ignore element
        }
    })

, как мы можемвидите, оператору .handle требуется передать BiConsumer<T, SynchronousSink<> для обработки элемента.Здесь мы должны параметры в нашем BiConsumer.Первый - это элемент из вышестоящего потока, второй - SynchronousSink, который помогает нам синхронно поставлять элемент в нисходящий поток.Такая техника расширяет возможности предоставления различных результатов обработки нашего элемента.Например, если элемент недействителен, мы можем передать ошибку в тот же SycnchronousSync, который отменит восходящий поток и выдаст сигнал onError в нисходящий поток.В свою очередь, мы можем «фильтровать», используя тот же оператор handle.Как только дескриптор BiConsumer будет выполнен и элемент не будет предоставлен, Reactor сочтет это своего рода фильтрацией и запросит для нас дополнительный элемент.Наконец, в случае, если элемент действителен, мы можем просто вызвать SynchronousSink#next и распространить наш элемент вниз по потоку или применить к нему некоторое отображение, поэтому мы будем иметь handle в качестве оператора map здесь.Более того, мы можем безопасно использовать этот оператор без влияния на производительность и обеспечить сложную проверку элемента, такую ​​как проверка элемента или отправка ошибки в нисходящий поток.

Броски с использованием #concatMap + Mono.error

Один из вариантов выброса исключения во время отображения - заменить map на concatMap.По своей сути concatMap делает почти то же самое, что flatMap.Разница лишь в том, что concatMap допускает только один подпоток за раз.Такое поведение значительно упрощает внутреннюю реализацию и не влияет на производительность.Таким образом, мы можем использовать следующий код, чтобы вызвать исключение более функциональным способом:

Mono.just(userId)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Mono.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

В приведенном выше примере для недопустимого пользователя мы возвращаем исключение, используя Mono.error.То же самое мы можем сделать для потока, используя Flux.error:

Flux.just(userId1, userId2, userId3)
    .map(repo::findById)
    .concatMap(user-> {
        if(!isValid(user)){
            return Flux.error(new InvalidUserException());
        }
        return Mono.just(user);
    })

Примечание , в обоих случаях мы возвращаем cold stream, который имеет только один элемент.В Reactor есть несколько оптимизаций, которые повышают производительность в случае, если возвращаемый поток является холодным скалярным потоком.Таким образом, рекомендуется использовать Flux / Mono concatMap + .just, empty, error в результате, когда нам нужно более сложное отображение, которое может закончиться return null или throw new ....

Внимание!Никогда не проверяйте входящий элемент на обнуляемость.Проект Reactor никогда не отправит вам значение null, поскольку эта спецификация Reactive Streams насилия (см. Правило 2.13 ) Таким образом, в случае, если repo.findById вернет ноль, Reactor выдаст исключение NPE дляВы.

Подождите, почему concatMap лучше, чем flatMap?

По своей сути, flatMap предназначен для объединения элементов из нескольких подпотоков, которые выполняются одновременно.Это означает, что в flatMap должны быть асинхронные потоки, поэтому они могут обрабатывать данные в нескольких потоках или это могут быть несколько сетевых вызовов.Впоследствии такие ожидания сильно влияют на реализацию, поэтому flatMap должен иметь возможность обрабатывать данные из нескольких потоков (Thread s) (означает использование параллельных структур данных), ставить элементы в очередь, если происходит слив из другого потока (означает дополнительныевыделение памяти на Queue s для каждого подпотока) и не нарушать правила спецификации Reactive Streams (означает действительно сложную реализацию).Считая все эти факты и тот факт, что мы заменяем простую операцию map (которая является синхронной) на более удобный способ генерирования исключения с использованием Flux/Mono.error (который не изменяет синхронность выполнения), приводит к тому, что мы делаемнам не нужен такой сложный оператор, и мы можем использовать гораздо более простой concatMap, который предназначен для асинхронной обработки одного потока за раз и имеет пару оптимизаций для обработки скалярного, холодного потока.

Выдает исключение, используя switchOnEmpty

Итак, еще один подход к созданию исключения, когда результат пуст, - это оператор switchOnEmpty.Следующий код демонстрирует, как мы можем использовать этот подход:

Mono.just(userId)
    .flatMap(repo::findById)
    .switchIfEmpty(Mono.error(new UserNotFoundExeception()))

Как мы видим, в этом случае repo::findById должно иметь Mono из User в качестве возвращаемого типа.Поэтому, если экземпляр User не будет найден, поток результатов будет пустым.Таким образом, Reactor вызовет альтернативный Mono, указанный как switchIfEmpty параметр.

Бросьте ваше исключение как

Это может быть расценено как менее читаемый код или плохая практика, но выможет бросить ваше исключение как есть.Этот шаблон нарушает спецификацию Reactive Streams, но реактор поймает выброшенное для вас исключение и передаст его в виде сигнала onError вашему нисходящему потоку

Takeaways

  1. Используйте оператор .handle в порядкечтобы обеспечить сложную обработку элементов
  2. Используйте concatMap + Mono.error, когда нам нужно вызвать исключение во время отображения, но такой метод наиболее подходит для случаев асинхронной обработки элементов.
  3. Использование flatMap + Mono.error когда у нас уже есть flatMap на месте
  4. Null, так как тип возврата запрещен, поэтому вместо null в нисходящем потоке map вы получите неожиданное onErrorс NullPointerException
  5. Используйте switchIfEmpty во всех случаях, когда вам нужно отправить сигнал об ошибке, если результат вызова какой-то конкретной функции завершился с empty stream
...