Рабочий интерфейс
interface Processor {
Mono<Context> process(Context context);
}
Рабочий процесс
List<Processor[]> processorsList = Lists.list(
new Processor[] {processorImpl1},
new Processor[] {processorImpl2}
);
Это моя цель.
/*
List
sequential work
start -> Processor[] -> Processor[]
Array[]
parallel work
start -> Processor
-> Processor
-> Processor
*/
Я пробовал это.
Flux
.fromIterable(processorsList)
.reduce(
Mono.subscriberContext(),
(contextMono, processors) -> contextMono
.flatMap(c -> Flux
.fromArray(processors)
.parallel()
.flatMap(processor -> processor.process(c))
.reduce(Context::putAll)
.map(c::putAll)))
.flatMap(Function.identity())
.subscriberContext(context)
.subscribe(c -> log.info("end={}", JSON.toJSONString(
c.stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.toList()))));
Я думаю, что немного неудобно иметь дело с таким контекстом.
Есть ли более элегантный способ?