Итерация потока, выполнение моно внутри, использование результата на следующем шаге - PullRequest
1 голос
/ 26 марта 2020

Я хочу сделать что-то подобное, как показано ниже, где результат предыдущего вызова используется при следующем вызове той же службы с использованием Project Reactor.

Message current;
Message next;
for each (Step step in steps)
{
    current = new Message(step, next);
    next = execute(current);
}

Это то, что я пытаюсь чтобы сделать использование реактора:

  1. Для каждого "шага" (в потоке)

    a. Создание сообщения для этого шага и последнего результата (ноль для начала).

    b. Вызовите службу, используя сообщение, и получите результат (моно).

    c. Установите последнее сообщение для этого результата, чтобы его можно было использовать в 1a.

  2. Возьмите самый последний результат

Моя неудачная попытка сделать это так далеко выглядит:

return fromIterable(request.getPipeline())
    .map(s -> PipelineMessage.builder()
        .client(client)
        .step(s.getStep())
        .build())
    .flatMap(z -> {
        return this.pipelineService.execute(z);
    })
    .last()
    .map(m -> ok()
        .entity(m.getPayload())
        .type(m.getType())
        .build());

1 Ответ

1 голос
/ 27 марта 2020

Не уверен в ваших точных требованиях. Но я думаю, что здесь может помочь reduce функция.

Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));

stringFlux
        .reduce("empty", (next, step) -> {
            String current = message(step, next);
            return execute(current);
        })
        .map(String::toUpperCase)
        .subscribe(System.out::println);

Здесь функции message и execute являются такими:

String message(String step, String next){
    return "message[" + step + ":" + next + "]";
}

String execute(String current){
    return "execute(" + current + ")";
}

Конечным выводом будет последнее выполненное сообщение.

EXECUTE(MESSAGE[C:EXECUTE(MESSAGE[B:EXECUTE(MESSAGE[A:EMPTY])])])

Начальный шаг не может быть null здесь. Вместо этого вы можете использовать пустой шаговый объект и рассматривать его как null.


Flux<String> stringFlux = Flux.fromIterable(Arrays.asList("a", "b", "c"));

stringFlux
        .reduce(Flux.just("empty"), (Flux<String> next, String step) -> {
            Flux<String> current = message(step, next);
            return current.flatMap(this::execute);
        })
        .flatMapMany(a -> a)
        .subscribe(System.out::println);


Flux<String> message(String step, Flux<String> next){
    return next.map(v -> "message(" + v + ":" + step + ")");
}

Mono<String> execute(String current){
    return Mono.just("execute(" + current + ")");
}

Вывод:

execute(message(execute(message(execute(message(empty:a)):b)):c))
...