Есть ли другой способ создать рабочий процесс (Список) с контекстом по проекту реактора? - PullRequest
0 голосов
/ 16 мая 2019

Рабочий интерфейс

    interface Processor {
        Mono<Context> process(Context context);
    }

Рабочий процесс

        List<Processor[]> processorsList = Lists.list(
                new Processor[] {processorImpl1},
                new Processor[] {processorImpl2}
        );

Это моя цель.


        /*
        List
        sequential work
        start -> Processor[] -> Processor[]

        Array[]
        parallel work
        start -> Processor
              -> Processor
              -> Processor
         */

Я пробовал это.

        Flux
                .fromIterable(processorsList)
                .reduce(
                        Mono.subscriberContext(),
                        (contextMono, processors) -> contextMono
                                .flatMap(c -> Flux
                                        .fromArray(processors)
                                        .parallel()
                                        .flatMap(processor -> processor.process(c))
                                        .reduce(Context::putAll)
                                        .map(c::putAll)))
                .flatMap(Function.identity())
                .subscriberContext(context)
                .subscribe(c -> log.info("end={}", JSON.toJSONString(
                        c.stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.toList()))));

Я думаю, что немного неудобно иметь дело с таким контекстом.

Есть ли более элегантный способ?

...