Самый эффективный способ разделения потока на несколько потоков в Reactor 3 - PullRequest
1 голос
/ 28 апреля 2019

В Reactor 3, каков наиболее эффективный способ разделения гетерогенного потока на несколько потоков путем сопоставления с образцом? (И последующие операции на каждом потоке могут быть очень разными)

Например,

Source Flux: a->b->c->a->b->c
 ||
 vv
A Flux: a->a->a
B Flux: b->b->b
C Flux: c->c->c

Я новичок в реактивном программировании, и единственное решение, которое я придумаю, это share() + filter(), как

val shared = flux.share();
shared.filter(x -> x.tag=='a').subscribe(a -> consumeA(a));
shared.filter(x -> x.tag=='b').subscribe(b -> consumeB(b));
shared.filter(x -> x.tag=='c').subscribe(c -> consumeC(c));

Это лучшее решение или есть лучшая парадигма для этой проблемы?

1 Ответ

3 голосов
/ 28 апреля 2019

Если количество групп довольно мало, то вы можете использовать Flux.groupBy, на которые есть ссылки в документах по реактору проекта

Например:

Flux<String> flux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2")
        .groupBy(s -> s.charAt(0))
        .concatMap(groupedFlux -> groupedFlux
                .startWith("Group " + groupedFlux.key()));

StepVerifier.create(flux)
        .expectNext("Group a", "a1", "a2")
        .expectNext("Group b", "b1", "b2")
        .expectNext("Group c", "c1", "c2")
        .verifyComplete();

Вы можете использовать groupedFlux.key() для изменения операций, выполняемых для каждой группы.

...