Spring 5 FluxSink не отправляет данные при выполнении fluxSink.next - PullRequest
0 голосов
/ 23 мая 2018

У меня есть один пример метода GET, который генерирует Randon INT и отправляет обратно клиенту

@GetMapping
public Flux<String> search() {
    return Flux.create(fluxSink -> {

            Random r = new Random();
            int n;
            for (int i = 0; i < 10; i++) {
                n= r.nextInt(1000);
                System.out.println("Creating:"+n);
                fluxSink.next(String.valueOf(n));

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fluxSink.complete();


    });
}

Curl:

curl -v -H "Принять:text / event-stream "-X GET 'http://localhost:8080/stream

С помощью этого кода с помощью команды CURL я могу видеть цифры в моем клиенте только тогда, когда происходит fluxSink.complete.

Теперь, если я изменю свой код на:

@GetMapping
public Flux<String> search() {
    return Flux.create(fluxSink -> {
        Thread t = new Thread(() -> {
            Random r = new Random();
            int n;
            for (int i = 0; i < 10; i++) {
                n= r.nextInt(1000);
                System.out.println("Creating:"+n);
                fluxSink.next(String.valueOf(n));

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fluxSink.complete();
        });
        t.start();
    });
}

Оборачивая процесс в поток, он работает нормально.Я могу видеть, что данные передаются нормально, когда происходит fluxSink.next.

Кто-нибудь может объяснить этот эффект?Как я могу видеть поток данных без явного использования Thread?

Спасибо!

1 Ответ

0 голосов
/ 28 мая 2018

Flux.create() - это «испускание нескольких элементов синхронным или асинхронным способом через API FluxSink».В вашем случае вы синхронно излучаете все элементы и не даете Reactor возможности выполнять работу, поскольку вы никогда не выходите из этого метода до тех пор, пока все не будет сделано.В двух словах, вы блокируете текущий поток, что запрещено в Reactor.

Использование потока оставляет Reactor возможность отправлять эти сигналы.Но дух Flux.create() (как видно из его javadoc) заключается в адаптации к обратным вызовам.

В этом случае, возможно, Flux.generate подойдет лучше.Попытка симулировать задержку с Thread не очень хорошая идея.Вместо этого вы можете сделать что-то вроде:

Flux<String> response = Flux.just(aListOfElements).delayElements(Duration.ofSeconds(2));
...