Я работаю над проектом, использующим стек Spring WebFlux.У нас есть Контроллер, где вы можете подписаться на обновления для определенного объекта.Этот контроллер возвращает EmitterProcessor
, где клиенты могут подписаться.Когда что-то публикуется на EmitterProcessor
, подписанные клиенты получают уведомление.
На практике это хорошо работает, но мой модульный тест не проходит.В модульном тесте используется WebTestClient , который блокируется, когда операция exchange()
возвращает EmitterProcessor
(также пробовали с другими реализациями FluxProcessor
, такими как UnicastProcessor
).Я получаю следующую ошибку:
java.lang.IllegalStateException: Тайм-аут при чтении блокировки для 5000 МИЛЛИСЕКОНД
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:117)
at reactor.core.publisher.Mono.block(Mono.java:1524)
at org.springframework.test.web.reactive.server.DefaultWebTestClient$DefaultRequestBodyUriSpec.exchange(DefaultWebTestClient.java:283)
Я нашел этот поток , который также сообщает, что метод exchange()
для блоков WebTestClient
блокируется, но, как объяснено там, он блокируется только для получения статуса и заголовков, поэтому это не должно быть проблемой.Кроме того, в случае, если возвращается Flux
, это прекрасно работает, что демонстрируется ссылочным testcase .
Test case
Я создал простой тестовый примервзято из ссылочного тестового примера и адаптировано для возврата Flux
в одном случае и EmitterProcessor
в другом случае (что не удается).Как вы могли заметить, утверждения для EmitterProcessor
должны его потерпеть неудачу, но оно никогда не получится из-за блокирующего вызова exchange()
.
Также обратите внимание, что когда я раскомментирую строку processor.onNext("hello");
, это содержимоевозвращается, и тестовый сценарий успешен.
package test;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE;
import org.junit.Test;
import org.springframework.test.web.reactive.server.FluxExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class ProcessorTest {
private final WebTestClient client = WebTestClient.bindToController(new StringController()).configureClient().build();
@Test
public void entityStream() {
FluxExchangeResult<String> result = client //
.get().uri("/flux") //
.accept(TEXT_EVENT_STREAM) //
.exchange() //
.expectStatus().isOk() //
.expectHeader().contentTypeCompatibleWith(TEXT_EVENT_STREAM) //
.returnResult(String.class);
StepVerifier.create(result.getResponseBody()) //
.expectNext("hello") //
.thenCancel() //
.verify();
}
@Test
public void entityStreamProcessor() {
FluxExchangeResult<String> result = client //
.get().uri("/processor") //
.accept(TEXT_EVENT_STREAM) //
.exchange() //
.expectStatus().isOk() //
.expectHeader().contentTypeCompatibleWith(TEXT_EVENT_STREAM) //
.returnResult(String.class);
StepVerifier.create(result.getResponseBody()) //
.expectNext("hello") //
.thenCancel() //
.verify();
}
@RestController
static class StringController {
@GetMapping(value = "/flux", produces = TEXT_EVENT_STREAM_VALUE)
Flux<String> getFlux() {
return Flux.just("hello");
}
@GetMapping(value = "/processor", produces = TEXT_EVENT_STREAM_VALUE)
Flux<String> getProcessor() {
EmitterProcessor<String> processor = EmitterProcessor.create();
// processor.onNext("hello");
return processor;
}
}
}