Как охотно объединить два Flux? - PullRequest
0 голосов
/ 03 марта 2019
Flux<Long> flux1 = Flux
        .<Long>create(fluxSink -> {
            for (long i = 0; i < 20; i++) {
                fluxSink.next(i);
            }
        })
        .filter(aLong -> aLong % 2 == 0)
        .doOnNext(aLong -> System.out.println("flux 1 : " + aLong));

Flux<Long> flux2 = Flux
        .<Long>create(fluxSink -> {
            for (long i = 0; i < 20; i++) {
                fluxSink.next(i);
            }
        })
        .filter(aLong -> aLong % 2 == 1)
        .doOnNext(aLong -> System.out.println("flux 2 : " + aLong));

Flux.merge(flux1, flux2)
        .doOnNext(System.out::println)
        .then()
        .block();

Создать два Flux<Long> как верхний код.

flux1 создать поток четных чисел (0,2,4,6,8 ...) flux2 создать поток нечетных чисел (1,3,5,7,9 ...)

Я ожидал, когда слияние этих 2 потоков 1 и 2 работает как

0,1,2,3,4 ... или 0,2,1,3,4.. зависит от вычислительной мощности

, новсегда тратить flux1 и тратить flux2 (flux1 start)0,2,4,6,8, ... 16,18,(flux1 end)(flux2 start)1,3,5,7 ... 17,19

как подписаться на несколько событий потока с нетерпением?

1 Ответ

0 голосов
/ 08 марта 2019

Оба потока работают в одном потоке.Когда вы подписываетесь flux1 начинает толкать данные, пока не закончится.Только тогда поток свободен для flux2 продолжения.Оператор merge выдает значения в порядке их поступления.Он не переключается между первым и вторым потоками.

Если вы хотите, чтобы потоки запускались одновременно, вам нужно запустить их в разных потоках, например, с помощью оператора publishOn.

Flux<Long> flux1 = Flux
    .<Long>create(fluxSink -> {
        for (long i = 0; i < 20; i++) {
            fluxSink.next(i);
        }
    })
    .publishOn(Schedulers.newSingle("thread-x")
    .filter(aLong -> aLong % 2 == 0)
    .doOnNext(aLong -> System.out.println("flux 1 : " + aLong));
...