Spring Reactor: что соответствует классу Optional <T>? - PullRequest
0 голосов
/ 28 апреля 2019

Итак, у меня есть Flux<Foo>, и я хочу сопоставить каждый Foo с Baz.Дело в том, что getBaz(Foo foo) может выдать IOException.

Так что я подумал о том, чтобы иметь Mono<Baz> getBazRx(Foo foo) метод, который будет возвращать либо Mono.just(baz), либо Mono.empty() в случае исключения.

Тогда получится Flux<Mono<Baz>>, который напоминает контейнер Optional<T>.

Так ли это делается в Spring Reactor?Как правильно его потреблять?

Ответы [ 2 ]

2 голосов
/ 28 апреля 2019

В реактивных потоках «необязательные» обычно обрабатываются путем удаления отсутствующих элементов из потока (например, пустого Mono или Flux с пропущенным элементом.), Вместо того, чтобы иметь Flux<Optional>, Mono<Optional> или Flux<Mono>

При вызове синхронного метода getBaz вы можете использовать одну операцию .handle, например:

flux
    .handle((foo, sink) -> {
        try {
            // propagate Baz down the stream
            sink.next(getBaz(foo));
        } catch (IOException e) {
            // Since sink.next is not called here,
            // the problematic element will be dropped from the stream
            log.error(e);
        }
    })

При вызове асинхронного getBazRxметод (возвращая Mono), вы можете использовать onErrorResume внутри flatMap / flatMapSequential / concatMap, например:

flux
    .flatMap(foo -> getBazRx(foo)
        .onErrorResume(t -> {
            log.error(t);
            return Mono.empty();
        }))

(или вы можете переместить .onErrorResume внутрь .getBazRx, в зависимости от того, где вы хотите поймать и игнорировать исключение)

Кроме того, поскольку вы упомянули об этом в своем вопросе ... если вы хотите создать getBazRx, который охватывает getBaz, вам следует никогда делать что-то подобное, если getBaz имеет потенциал для блокировки:

Mono<Baz> getBazRx(Foo foo) {
    // BAD!!!
    try {
        return Mono.just(getBaz(foo));
    } catch (IOException e) {
        return Mono.error(e)  // or Mono.empty() if you want to ignore
    }
}

Эта реализация на самом деле является просто синхронным методом олицетворением асинхронного метода.Есть две проблемы с ним:

  1. Работа выполняется немедленно, а не после подписки на возвращенные Mono
  2. Если getBaz блокируется, вы можете заблокировать событиеloop

Вместо этого вы должны отложить работу до подписки на моно и запустить любую операцию блокировки на Scheduler, предназначенную для операций блокировки, например:

Mono<Baz> getBazRx(Foo foo) {
    return Mono.fromSupplier(() -> {
            try {
                return getBaz(foo);
            } catch (IOException e) {
                throw Exceptions.propagate(e);  // or return null to ignore and complete empty
            }
        })
        .subscribeOn(Schedulers.elastic());  // run on a scheduler suitable for blocking work
}
0 голосов
/ 28 апреля 2019

Поскольку вы хотите пропустить ошибку (просто зарегистрируйте ее, например), вы можете использовать onErrorContinue. Кроме того, поскольку getBaz выдает проверенное исключение, нам нужно его перехватить и вместо него return (не throw) a RuntimeException. В Reactor есть служебный метод для этого Exceptions.propagate:

flux
  .map(foo -> {
      try {
        return getBaz(foo);
      } catch (IOException e) {
        return Exceptions.propagate(e);
      }
  })
  .onErrorContinue(RuntimeException.class, (t, b) -> log.error(t))
  .subscribe(baz -> log.info("Read value {}", baz));
...