Обработка Mono Inside Flux Flatmap - PullRequest
0 голосов
/ 31 декабря 2018

У меня есть поток строк.Для каждой строки я должен сделать удаленный вызов.Но проблема в том, что метод, который делает удаленный вызов, на самом деле возвращает Mono ответа (очевидно, поскольку, соответствующий одному запросу, будет один ответ).

Какой должен быть правильный шаблон дляобрабатывать такие случаи?Одно из решений, которое я могу придумать, - это делать последовательные (или параллельные) вызовы для элементов потока и сокращать ответы до одного и возвращать.

Вот код:

fluxObj.flatmap(a -> makeRemoteCall(a)//converts the Mono of the response to a Flux).reduce(...)

IЯ не могу обернуть мою голову внутри flatmap. Метод makeRemoteCall возвращает Mono.Но flatmap возвращает Flux ответа.Во-первых, почему это происходит?Во-вторых, означает ли это, что возвращенный Flux содержит один объект ответа (который был возвращен в Mono)?

Ответы [ 2 ]

0 голосов
/ 02 января 2019

Если mapper Function возвращает Mono, то это означает, что будет (самое большее) одно производное значение для каждого исходного элемента в Flux.

Возврат Function:

  • пустой моно (например, Mono.empty()) для данного значения означает, что это исходное значение «игнорируется»
  • и оценивается Mono(как в вашем примере) означает, что это исходное значение асинхронно отображается на другое конкретное значение
  • a Flux с несколькими производными значениями для данного значения, означает, что это исходное значение асинхронно отображается на несколько значений

Например, с учетом следующего flatMap:

Flux.just("A", "B")
    .flatMap(v -> Mono.just("value" + v))

Подписка на вышеприведенное Flux<String> и печать выпущенных элементов приведет к:

valueA
valueB

Другойзабавный пример: с задержками можно выйти из результатов заказа.Вот так:

Flux.just(300, 100)
    .flatMap(delay -> Mono.delay(Duration.ofMillis(delay))
                          .thenReturn(delay + "ms")
    )

приведет к Flux<String>, что даст:

100ms
300ms
0 голосов
/ 02 января 2019

Если вы видите документацию flatMap , вы можете найти ответы на свои вопросы:

Преобразовать элементы, испускаемые этим потоком, асинхронно в издателей, а затем сгладить этих внутренних издателей водин поток, последовательно и сохраняющий порядок с использованием конкатенации.

Короче говоря,

@Test
public void testFlux() {
    Flux<String> oneString = Flux.just("1");

    oneString
        .flatMap(s -> testMono(s))
        .collectList()
        .subscribe(integers -> System.out.println("elements:" + integers));       

}

private Mono<Integer> testMono(String s) {
    return Mono.just(Integer.valueOf(s + "0"));
}

картограф - s -> testMono(s)где testMono(s) - это Publisher (в вашем случае makeRemoteCall(a)), он преобразует тип моего oneString в Integer .

Я собрал результат Flux вСписок, и распечатал его.Вывод на консоль:

elements:[10]

Это означает, что результат Flux после оператора flatMap содержит только один элемент.

...