Spring WebClient и лонгполлинг - PullRequest
2 голосов
/ 18 июня 2020

Я хочу использовать реактивный WebClient Spring для опроса конечной точки REST, которая использует длинный опрос.

Конечная точка предоставляет сообщения для канала чата. Когда я вызываю его, а сообщения нет, он блокируется (т.е. не возвращается), пока не появится сообщение (или не пройдет 30 секунд).

Итак, в синхронном мире я бы выделил поток для отслеживания этого канал, вызовите конечную точку через RestTemplate, дождитесь результата, запишите его в общую очередь и начните следующий запрос. Затем потребитель может реагировать на новые элементы, появляющиеся в очереди.

В реактивном мире это немного другое. В идеале потребитель подписался бы на поток сообщений. Вопрос в том, как построить этот поток.

Лог c должен быть:

Mono<String> message = WebClient.get(). […] .bodyToMono(String.class);
// When the mono completes, create a new one just as described above
// Combine all of the monos into a Flux
flux.subscribe(message -> System.out.println("New message" + message);

Я думаю, что мне нужен какой-то оператор switch…, но я могу найти правильный один.

Ответы [ 2 ]

1 голос
/ 19 июня 2020

Как указано в @ 123:

Вы можете просто использовать повтор, т.е. WebClient.get(). […] .bodyToMono(String.class).repeat(), даст вам поток ответов и запустит следующий только тогда, когда предыдущий будет выполнен .

Фактически, здесь требуется defer() и repeat(): defer() принимает поставщика Monos, а repeat() повторно подписывается на Mono после завершения предыдущей подписки. . Это вызовет повторный вызов поставщика и, таким образом, будет запущен новый HTTP-запрос. HTTP-запрос в полете. Чтобы полностью завершить Flux, можно использовать takeUntilOther(), который требует другого издателя (например, EmitterProcessor). Затем в методе @PreDestroy вы можете вызвать shutdown.onNext(true), что приведет к отмене http-запроса.

Мое решение теперь выглядит так:

   Mono.defer(() -> receiveMessage())
   .repeat()
   .takeUntilOther(shutdown)
   .subscribe(message -> System.out.println("New message" + message);
0 голосов
/ 18 июня 2020

Если я понял это правильно, вы сможете решить это с помощью чего-то вроде

Flux.interval(Duration.ofSeconds(30))
    .flatMap(counter -> WebClient.get().[…].bodyToMono(String.class))
    .subscribe(message -> System.out.println("New message" + message))
...