Концептуально, у меня есть источник, который излучает IP-адреса (на неопределенный срок) и два процессора.
Эти процессоры по существу делают запросы ввода-вывода.Я хотел бы объединить результаты этих процессоров, когда они будут готовы, и передать их в какой-то приемник, который мог бы обрабатывать оба результата вместе.
Я попытался написать какой-нибудь игрушечный пример, но этоне работает, поскольку source
Flux никогда не заканчивается.
Как правильно это сделать?
public class Demo {
public static void main(String[] args) throws Exception {
Flux<String> source = Flux.fromIterable(Lists.newArrayList("1.1.1.1", "2.2.2.2", "3.3.3.3")).delayElements(Duration.ofMillis(500)).repeat();
ConnectableFlux<String> ipsFlux = source.publish();
Flux<Foo> fooFlux1 = Flux.from(ipsFlux)
.map(ip -> new Foo(ip, "1"));
Flux<Foo> fooFlux2 = Flux.from(ipsFlux)
.map(ip -> new Foo(ip, "2"));
Flux.merge(fooFlux1, fooFlux2)
.groupBy(Foo::getId, Function.identity())
.subscribe(flux -> flux.collectMap(foo -> foo.type).subscribe(System.out::println));
ipsFlux.connect();
Thread.currentThread().join();
}
static class Foo {
String id;
String type;
public Foo(String id, String type) {
this.id = id;
this.type = type;
}
public String getId() {
return id;
}
@Override
public String toString() {
return "Foo{" +
"id='" + id + '\'' +
", value='" + type + '\'' +
'}';
}
}
}