onSubscribe приходит слишком поздно при использовании fromCallable - PullRequest
0 голосов
/ 27 августа 2018

У меня довольно частое требование выполнять некоторую логику в onSubscribe и onFinally, что может быть достигнуто с помощью

private <T> Mono<T> doAtStartAndEnd(Mono<T> source) {
  return source.doOnSubscribe((s) -> {
      System.out.println("ON SUBSCRIBE");
    }).doFinally((f) -> {
      System.out.println("ON FINALLY");
    });
}

и используя его через transform в следующей цепочке:

List<String> result = Mono.fromCallable(() -> getListOfStrings())
// .log()
  .flatMapIterable(list -> list)
  .map(String::toUpperCase)
  .collectList()
  .transform(this::doAtStartAndEnd)
  .block();

Конечно, ожидаемое поведение таково, что ON SUBSCRIBE появляется в консоли перед вызовом вызываемого здесь, getListOfStrings(). Однако логика подписки MonoFlattenIterable ведет к противоположному поведению. Это относится не только к flatMapIterable, но и к различным другим операторам, таким как zip.

Если я раскомментирую строку с .log(), цепочка будет работать как положено.

Может быть, это точно так же, как в Reactive Gem # 22 * ​​1019 *, но как мне добиться желаемого поведения, не оборачивая Mono / Flux снова, например? в Mono.defer(() -> Mono.fromCallable(() -> getListOfStrings()))?

1 Ответ

0 голосов
/ 28 августа 2018

flatMapIterable пытается избежать полного цикла подписки и запроса RS, чтобы получить Iterable, когда источник обнаружен как Callable. Вместо этого он напрямую вызывает метод call().

Это хорошо для таких случаев, как Flux.just(myList).flatMapIterable(list -> list). Тем не менее, я думаю, что это может быть слишком много сделать на Mono.fromCallable ...

Одним из способов подавления обнаружения вашего Mono.fromCallable() для этой оптимизации является использование .hide() после нее. Что касается .log(), это меняет экземпляр, который видим на flatMapIterable, на что-то, что НЕ Callable.

...