Пружинный реактор: как дождаться нескольких ключей Flux? - PullRequest
2 голосов
/ 09 июля 2019

Концептуально, у меня есть источник, который излучает IP-адреса (на неопределенный срок) и два процессора.

Эти процессоры по существу делают запросы ввода-вывода.Я хотел бы объединить результаты этих процессоров, когда они будут готовы, и передать их в какой-то приемник, который мог бы обрабатывать оба результата вместе.

Я попытался написать какой-нибудь игрушечный пример, но этоне работает, поскольку source Flux никогда не заканчивается.

Как правильно это сделать?

public class Demo {

    public static void main(String[] args) throws Exception {


        Flux<String> source = Flux.fromIterable(Lists.newArrayList("1.1.1.1", "2.2.2.2", "3.3.3.3")).delayElements(Duration.ofMillis(500)).repeat();
        ConnectableFlux<String> ipsFlux = source.publish();

        Flux<Foo> fooFlux1 = Flux.from(ipsFlux)
                .map(ip -> new Foo(ip, "1"));

        Flux<Foo> fooFlux2 = Flux.from(ipsFlux)
                .map(ip -> new Foo(ip, "2"));

        Flux.merge(fooFlux1, fooFlux2)
                .groupBy(Foo::getId, Function.identity())
                .subscribe(flux -> flux.collectMap(foo -> foo.type).subscribe(System.out::println));

        ipsFlux.connect();

        Thread.currentThread().join();

    }

    static class Foo {
        String id;
        String type;

        public Foo(String id, String type) {
            this.id = id;
            this.type = type;
        }

        public String getId() {
            return id;
        }

        @Override
        public String toString() {
            return "Foo{" +
                    "id='" + id + '\'' +
                    ", value='" + type + '\'' +
                    '}';
        }
    }
}

1 Ответ

1 голос
/ 12 июля 2019

при просмотре документации оператора слияния (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#merge-org.reactivestreams.Publisher...-) кажется, что слияние не подходит для обработки потоков infinte:

Обратите внимание, что слияние предназначено для работы с асинхронными или конечными источниками. При работе с бесконечным источником, который еще не публикуется в выделенном планировщике, вы должны изолировать этот источник в своем собственном планировщике, так как в противном случае слияние попытается истощить его перед подпиской на другой источник.

я бы попробовал почтовый оператор (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#zip-org.reactivestreams.Publisher-org.reactivestreams.Publisher-)

Flux<Tuple2<Foo, Foo>> zipped = Flux.zip(fooFlux1, fooFlux2);

тогда ваш приемник может потреблять пару Foo, как только она станет доступной.

...