Как использовать groupBy на горячем источнике - PullRequest
0 голосов
/ 14 марта 2019

С учетом следующего кода;

У меня есть фальшивый «горячий источник», на котором я хотел печатать последние значения за city каждые 2 секунды. Я вижу, что log точки А и В действуют так, как я ожидал. Тем не менее, кодовые блоки на groupBy и только каждый из них выдает окончательное значение в log точке C. Как можно сделать так, чтобы "C" излучал каждые 2 секунды.

public class Weather {
    String city;
    Integer temperature;

public Weather(String city, Integer temperature) {
    super();
    this.city = city;
    this.temperature = temperature;
}

@Override
public String toString() {
    return "Weather [city=" + city + ", temperature=" + temperature + "]";
}

public static void main(String[] args) {

    BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();

    new Thread(() -> {
        for (int d = 1; d < 100; d += 1) {
            for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
                queue.add(new Weather(s, d));
                try { Thread.sleep(250); } catch (InterruptedException e) {}
            }
        }
    }).start(); 

    Flux<Weather> outgoing = Flux.create(
        sink -> {
            for (int i = 0; i < 100; i++) {
                try {
                    sink.next(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            sink.complete();
        }
    );

    ConnectableFlux<Weather> subscriber = outgoing.publish();
    subscriber
    .buffer(Duration.ofSeconds(2))
    .log("A")
    .flatMap(Flux::fromIterable)
    .log("B")
    .groupBy(c -> c.city)
    .flatMap(Flux::last)
    .log("C")           

    .subscribe(s -> System.out.println(">>>>>" + s));


    subscriber.connect();
    System.exit(0);
}

}

1 Ответ

0 голосов
/ 16 марта 2019

Это, кажется, работает;

        subscriber
          .groupBy(c -> c.city)
          .flatMap(g -> g
            .take(Duration.ofSeconds(5))
            .takeLast(1)
          )
          .subscribe(s -> System.out.println(">>>>>" + s));
...