Остановить поток реактора и отписаться после получения 1 события - PullRequest
0 голосов
/ 08 марта 2019

У меня следующая проблема: я построил клиент реактор.core.publisher.Flux, используя Spring для прослушивания SSE, которые ожидают некоторые события и если они не получают их через 15 секунд (timeout (Duration.ofSeconds (15) ))) это будет делать что-то еще. Тем не менее, я хотел бы, чтобы поток остановился и отписался от потребителя, если событие было получено в первые 15 секунд. Вот некоторый код:

flux = webClient.get()
                .uri(URI)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(new ParameterizedTypeReference<PagedResources<Foo>>() {})
                .timeout(ofSeconds(15));
        eventStream.doOnError(e -> {
            logger.info("No event was received in 15 seconds!");
        }).subscribe(new CustomConsumer(reporter)); 

Ответы [ 2 ]

2 голосов
/ 08 марта 2019

Это именно то, что делает оператор take:

.bodyToFlux(String.class)
.take(1)
0 голосов
/ 11 марта 2019

Я думаю, что blockFirst (Продолжительность длительности) является ответом в моем случае, потому что он ждет некоторое время, а после этого выбрасывает тайм-аут.

...