Блоки WebTestClient в модульном тесте, когда контроллер возвращает EmitterProcessor вместо Flux - PullRequest
0 голосов
/ 30 января 2019

Я работаю над проектом, использующим стек Spring WebFlux.У нас есть Контроллер, где вы можете подписаться на обновления для определенного объекта.Этот контроллер возвращает EmitterProcessor, где клиенты могут подписаться.Когда что-то публикуется на EmitterProcessor, подписанные клиенты получают уведомление.

На практике это хорошо работает, но мой модульный тест не проходит.В модульном тесте используется WebTestClient , который блокируется, когда операция exchange() возвращает EmitterProcessor (также пробовали с другими реализациями FluxProcessor, такими как UnicastProcessor).Я получаю следующую ошибку:

java.lang.IllegalStateException: Тайм-аут при чтении блокировки для 5000 МИЛЛИСЕКОНД

at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:117)
at reactor.core.publisher.Mono.block(Mono.java:1524)
at org.springframework.test.web.reactive.server.DefaultWebTestClient$DefaultRequestBodyUriSpec.exchange(DefaultWebTestClient.java:283)

Я нашел этот поток , который также сообщает, что метод exchange() для блоков WebTestClient блокируется, но, как объяснено там, он блокируется только для получения статуса и заголовков, поэтому это не должно быть проблемой.Кроме того, в случае, если возвращается Flux, это прекрасно работает, что демонстрируется ссылочным testcase .

Test case

Я создал простой тестовый примервзято из ссылочного тестового примера и адаптировано для возврата Flux в одном случае и EmitterProcessor в другом случае (что не удается).Как вы могли заметить, утверждения для EmitterProcessor должны его потерпеть неудачу, но оно никогда не получится из-за блокирующего вызова exchange().

Также обратите внимание, что когда я раскомментирую строку processor.onNext("hello");, это содержимоевозвращается, и тестовый сценарий успешен.

package test;

import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE;

import org.junit.Test;
import org.springframework.test.web.reactive.server.FluxExchangeResult;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

public class ProcessorTest {

  private final WebTestClient client = WebTestClient.bindToController(new StringController()).configureClient().build();

  @Test
  public void entityStream() {

    FluxExchangeResult<String> result = client //
        .get().uri("/flux") //
        .accept(TEXT_EVENT_STREAM) //
        .exchange() //
        .expectStatus().isOk() //
        .expectHeader().contentTypeCompatibleWith(TEXT_EVENT_STREAM) //
        .returnResult(String.class);

    StepVerifier.create(result.getResponseBody()) //
        .expectNext("hello") //
        .thenCancel() //
        .verify();
  }

  @Test
  public void entityStreamProcessor() {

    FluxExchangeResult<String> result = client //
        .get().uri("/processor") //
        .accept(TEXT_EVENT_STREAM) //
        .exchange() //
        .expectStatus().isOk() //
        .expectHeader().contentTypeCompatibleWith(TEXT_EVENT_STREAM) //
        .returnResult(String.class);

    StepVerifier.create(result.getResponseBody()) //
        .expectNext("hello") //
        .thenCancel() //
        .verify();
  }

  @RestController
  static class StringController {

    @GetMapping(value = "/flux", produces = TEXT_EVENT_STREAM_VALUE)
    Flux<String> getFlux() {
      return Flux.just("hello");
    }

    @GetMapping(value = "/processor", produces = TEXT_EVENT_STREAM_VALUE)
    Flux<String> getProcessor() {
      EmitterProcessor<String> processor = EmitterProcessor.create();
      // processor.onNext("hello");
      return processor;
    }

  }
}
...