Поток параллельно-последовательное исполнение с groupBy - PullRequest
0 голосов
/ 09 апреля 2019

Скажите, что у меня есть это:

Flux<GroupedFlux<Integer, Integer>> intsGrouped = Flux.range(0, 12)
   .groupBy(i -> i % 3);

и скажите, что у меня есть метод:

Mono<Integer> getFromService(Integer i);

Я хочу вызвать getFromService параллельно для каждой из групп, но сделатьуверен, что звонки являются последовательными в каждой группе.

Для приведенного выше примера это будет три параллельных потока с этими входными значениями:

stream 1: 0 -> 3 -> 6 -> 9
stream 2: 1 -> 4 -> 7 -> 10
stream 3: 2 -> 5 -> 8 -> 11

Я пробовал это, но это не то, что я хочу:

Flux.range(0, 12)
   .groupBy(i -> i % 3)
   .flatMap(g -> g.flatMap(i -> getFromService(g.key(), i)))

Это вызов службы параллельно для всех целых.Как мне поступить?

1 Ответ

1 голос
/ 13 апреля 2019

Используйте concatMap или flatMapSequential вместо внутреннего .flatMap

Если вы хотите последовательное выполнение в каждомgroup (то есть только одна подписка на getFromService за один раз в каждой группе), затем используйте .concatMap, например:

Flux.range(0, 12)
   .groupBy(i -> i % 3)
   .flatMap(g -> g.concatMap(i -> getFromService(g.key(), i)))

Если параллельное выполнение в группе ok , но вы просто заботитесь о порядке, в котором выдается последовательность, затем используйте flatMapSequential, например:

Flux.range(0, 12)
    .groupBy(i -> i % 3)
    .flatMap(g -> g.flatMapSequential(i -> getFromService(g.key(), i)))

Другой вариант - использовать .flatMap с аргументом concurrency, установленным в1, но я бы порекомендовал один из вышеперечисленных.

...