Как мне указать планировщик для Flux.generate - PullRequest
0 голосов
/ 02 ноября 2019

Как мне указать планировщик для Flux.generate? Внутри него есть блокирующий звонок, который я хотел бы отменить. До сих пор я взломал его через

Flux<Integer> generate = Flux.generate(....);
Mono<List<Integer>> fut =
        Flux.just("ignored")
                .publishOn(Schedulers.single())
                .flatMap(ignored -> generate)
                .timeout(Duration.ofSeconds(2), Flux.empty())

Есть ли более идиоматический способ?

1 Ответ

1 голос
/ 02 ноября 2019

Использовать подписку на

        Flux<Integer> g1 = Flux.generate(c -> {
            System.out.println(Thread.currentThread());
            c.next(1);
        });

        System.out.println(g1.take(5).collectList().block());

        Flux<Integer> g2 = g1.subscribeOn(Schedulers.elastic());

        System.out.println(g2.take(5).collectList().block());

Вывод

Thread[main,5,main]
Thread[main,5,main]
Thread[main,5,main]
Thread[main,5,main]
Thread[main,5,main]
[1, 1, 1, 1, 1]
Thread[elastic-2,5,main]
Thread[elastic-2,5,main]
Thread[elastic-2,5,main]
Thread[elastic-2,5,main]
Thread[elastic-2,5,main]
[1, 1, 1, 1, 1]
...