Слить выбросы потока с дублированием? - PullRequest
0 голосов
/ 26 октября 2018

У меня есть Flux, который испускает предметы:

data class Item(
  val isEgg: Boolean,
  val isBasket: Boolean
)

Рассмотрим 2 выброса «корзины» и 4 «яйца».Я хотел бы объединить эти выбросы в два: каждый содержит одну «корзину» и 4 «яйца»:

Flux emissions and it's transformation

Кто-нибудь знает о такой трансформации?Поток конечен и не должен превышать 1КБ.

РЕДАКТИРОВАТЬ:

То, чего я достиг на данный момент - я сгруппировал выбросы в GroupedFlux.Теперь мне нужно было бы объединить GroupedFlux, содержащий Basket1, Basket2 со вторым, содержащим 'Яйца', чтобы получить две корзины с "дублированными" яйцами в каждой.

    val flux = Flux.just("Egg1", "Egg2", "Basket1", "Egg3", "Egg4", "Basket2")

    val block = flux.groupBy {
        it.startsWith("Egg")
    }

Желаемый поток: Flux.just("Basket1(Egg1,Egg2, Egg3, Egg4)","Basket2(Egg1,Egg2, Egg3, Egg4)")

Ответы [ 2 ]

0 голосов
/ 29 октября 2018

Вы можете достичь этого результата с flatMap и reduce:

void combine() {
  Flux<String> flux = 
    Flux.just("Egg1", "Egg2", "Basket1", "Egg3", "Egg4", "Basket2");
  Flux<String> eggs = flux.filter(str -> str.startsWith("Egg"));
  Flux<String> basketNames = flux.filter(str -> str.startsWith("Basket"));
  basketNames.flatMap(basketName -> eggs.reduce(
      new Basket(basketName),
      (basket, egg) -> {
        basket.add(egg);
        return basket;
      })
  );
}

class Basket {

  private final String name;
  private final List<String> eggs;

  Basket(final String name) {
    this.name = name;
    this.eggs = new ArrayList<>();
  }

  public void add(String egg) {
    eggs.add(egg);
  }
}
0 голосов
/ 27 октября 2018

Ваш вопрос не очень понятен.Я бы сказал, что проверьте операторы merge () и concat ().Это должно помочь.

Редактировать: Исходя из предоставленной дополнительной информации, вопрос теперь ясен.Одно из возможных решений приведено ниже:

@Test
public void testBasket() {

    Egg eggA = new Egg("A");
    Egg eggB = new Egg("B");
    Egg eggC = new Egg("C");
    Egg eggD = new Egg("D");

    Basket basket1 = new Basket("basket1");
    Basket basket2 = new Basket("basket2");

    Sorter sorter = new Sorter();
    Sorter updatedSorter = Flux .just((Item) basket1, (Item) basket2, (Item) eggA, (Item) eggB, (Item) eggC,
            (Item) eggD)
                                .map(sorter::add)
                                .blockLast();

    updatedSorter.process();

    Flux<Basket> fluxBasket = Flux.fromStream(sorter.baskets.stream());

    fluxBasket.subscribe(d -> System.out.println("data:" + d));

}

class Sorter {
    List<Egg> eggs = new ArrayList<Egg>();
    List<Basket> baskets = new ArrayList<Basket>();

    public Sorter add(Item item) {
        if (item.isBasket)
            baskets.add((Basket) item);
        else
            eggs.add((Egg) item);

        return this;
    }

    public Sorter process() {
        System.out.println("---- Processing Eggs ----");

        for (Basket basket : baskets) {
            basket.addEggs(eggs);
        }

        System.out.println("---- Processing Done ----");

        System.out.println(this.toString());

        return this;
    }

    public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append("(");
        for (Basket basket : baskets) {
            sb.append(basket.toString() + ",");
        }
        sb.deleteCharAt(sb.length() - 1);
        sb.append(")");
        return sb.toString();
    }

}

class Item {
    boolean isEgg;
    boolean isBasket;
}

class Basket extends Item {
    public Basket(String name) {
        this.name = name;
        isBasket = true;
    }

    String name;

    List<Egg> eggs = new ArrayList<Egg>();

    public void addEggs(List<Egg> eggs) {
        this.eggs = eggs;
    }

    public String toString() {
        StringBuffer sb = new StringBuffer();
        sb.append(this.name);
        sb.append("(");
        for (Egg egg : eggs) {
            sb.append(egg.toString() + ",");
        }
        sb.deleteCharAt(sb.length() - 1);
        sb.append(")");
        return sb.toString();
    }
}

class Egg extends Item {
    public Egg(String name) {
        this.name = name;
        isEgg = true;
    }

    String name;

    public String toString() {
        return this.name;
    }
}

Вывод:

---- Processing Eggs ----
---- Processing Done ----
(basket1(A,B,C,D),basket2(A,B,C,D))
data:basket1(A,B,C,D)
data:basket2(A,B,C,D)

Edit2:

Другое решение без вызова блокировки:

@Test public void testBasket () {

    Egg eggA = new Egg("A");
    Egg eggB = new Egg("B");
    Egg eggC = new Egg("C");
    Egg eggD = new Egg("D");

    Basket basket1 = new Basket("basket1");
    Basket basket2 = new Basket("basket2");

    Sorter sorter = new Sorter();
    Mono<Sorter> bucketsMono = Flux .just((Item) basket1, (Item) basket2, (Item) eggA, (Item) eggB, (Item) eggC,
            (Item) eggD)
                                    .map(sorter::add)
                                    .reduce((sorter1, sorter2) -> sorter.process());

    bucketsMono.subscribe(d -> System.out.println("data:" + d));


}

Вывод:

(basket1),basket2))
---- Processing Eggs ----
---- Processing Done ----
(basket1(A),basket2(A))
---- Processing Eggs ----
---- Processing Done ----
(basket1(A,B),basket2(A,B))
---- Processing Eggs ----
---- Processing Done ----
(basket1(A,B,C),basket2(A,B,C))
---- Processing Eggs ----
---- Processing Done ----
(basket1(A,B,C,D),basket2(A,B,C,D))
data:(basket1(A,B,C,D),basket2(A,B,C,D))
...