С учетом следующего кода;
У меня есть фальшивый «горячий источник», на котором я хотел печатать последние значения за city
каждые 2 секунды. Я вижу, что log
точки А и В действуют так, как я ожидал. Тем не менее, кодовые блоки на groupBy
и только каждый из них выдает окончательное значение в log
точке C. Как можно сделать так, чтобы "C" излучал каждые 2 секунды.
public class Weather {
String city;
Integer temperature;
public Weather(String city, Integer temperature) {
super();
this.city = city;
this.temperature = temperature;
}
@Override
public String toString() {
return "Weather [city=" + city + ", temperature=" + temperature + "]";
}
public static void main(String[] args) {
BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();
new Thread(() -> {
for (int d = 1; d < 100; d += 1) {
for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
queue.add(new Weather(s, d));
try { Thread.sleep(250); } catch (InterruptedException e) {}
}
}
}).start();
Flux<Weather> outgoing = Flux.create(
sink -> {
for (int i = 0; i < 100; i++) {
try {
sink.next(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
);
ConnectableFlux<Weather> subscriber = outgoing.publish();
subscriber
.buffer(Duration.ofSeconds(2))
.log("A")
.flatMap(Flux::fromIterable)
.log("B")
.groupBy(c -> c.city)
.flatMap(Flux::last)
.log("C")
.subscribe(s -> System.out.println(">>>>>" + s));
subscriber.connect();
System.exit(0);
}
}