Ваш вопрос не очень понятен.Я бы сказал, что проверьте операторы merge () и concat ().Это должно помочь.
Редактировать: Исходя из предоставленной дополнительной информации, вопрос теперь ясен.Одно из возможных решений приведено ниже:
@Test
public void testBasket() {
Egg eggA = new Egg("A");
Egg eggB = new Egg("B");
Egg eggC = new Egg("C");
Egg eggD = new Egg("D");
Basket basket1 = new Basket("basket1");
Basket basket2 = new Basket("basket2");
Sorter sorter = new Sorter();
Sorter updatedSorter = Flux .just((Item) basket1, (Item) basket2, (Item) eggA, (Item) eggB, (Item) eggC,
(Item) eggD)
.map(sorter::add)
.blockLast();
updatedSorter.process();
Flux<Basket> fluxBasket = Flux.fromStream(sorter.baskets.stream());
fluxBasket.subscribe(d -> System.out.println("data:" + d));
}
class Sorter {
List<Egg> eggs = new ArrayList<Egg>();
List<Basket> baskets = new ArrayList<Basket>();
public Sorter add(Item item) {
if (item.isBasket)
baskets.add((Basket) item);
else
eggs.add((Egg) item);
return this;
}
public Sorter process() {
System.out.println("---- Processing Eggs ----");
for (Basket basket : baskets) {
basket.addEggs(eggs);
}
System.out.println("---- Processing Done ----");
System.out.println(this.toString());
return this;
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("(");
for (Basket basket : baskets) {
sb.append(basket.toString() + ",");
}
sb.deleteCharAt(sb.length() - 1);
sb.append(")");
return sb.toString();
}
}
class Item {
boolean isEgg;
boolean isBasket;
}
class Basket extends Item {
public Basket(String name) {
this.name = name;
isBasket = true;
}
String name;
List<Egg> eggs = new ArrayList<Egg>();
public void addEggs(List<Egg> eggs) {
this.eggs = eggs;
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append(this.name);
sb.append("(");
for (Egg egg : eggs) {
sb.append(egg.toString() + ",");
}
sb.deleteCharAt(sb.length() - 1);
sb.append(")");
return sb.toString();
}
}
class Egg extends Item {
public Egg(String name) {
this.name = name;
isEgg = true;
}
String name;
public String toString() {
return this.name;
}
}
Вывод:
---- Processing Eggs ----
---- Processing Done ----
(basket1(A,B,C,D),basket2(A,B,C,D))
data:basket1(A,B,C,D)
data:basket2(A,B,C,D)
Edit2:
Другое решение без вызова блокировки:
@Test public void testBasket () {
Egg eggA = new Egg("A");
Egg eggB = new Egg("B");
Egg eggC = new Egg("C");
Egg eggD = new Egg("D");
Basket basket1 = new Basket("basket1");
Basket basket2 = new Basket("basket2");
Sorter sorter = new Sorter();
Mono<Sorter> bucketsMono = Flux .just((Item) basket1, (Item) basket2, (Item) eggA, (Item) eggB, (Item) eggC,
(Item) eggD)
.map(sorter::add)
.reduce((sorter1, sorter2) -> sorter.process());
bucketsMono.subscribe(d -> System.out.println("data:" + d));
}
Вывод:
(basket1),basket2))
---- Processing Eggs ----
---- Processing Done ----
(basket1(A),basket2(A))
---- Processing Eggs ----
---- Processing Done ----
(basket1(A,B),basket2(A,B))
---- Processing Eggs ----
---- Processing Done ----
(basket1(A,B,C),basket2(A,B,C))
---- Processing Eggs ----
---- Processing Done ----
(basket1(A,B,C,D),basket2(A,B,C,D))
data:(basket1(A,B,C,D),basket2(A,B,C,D))