Я только что создал простой тест:
- Запустите http-сервер
- отправьте ему запрос внутри теста
- проверьте, был ли он получен и обработан
Проблема -
- Если я запускаю только один тест или обратный отсчет в контексте обработчика (случай 1) - все работает. Если я запускаю два теста и обратный отсчет не в контексте обработчика (случай 2) - второй тест прерывается. Выходные данные показывают, что второй тест наблюдает защелку, созданную для первого теста (System.identityHashCode указывает, что).
- Кроме того, создание сервера с помощью RouterFunctions.toHttpHandler (createMailServerBuggy) усугубляет ситуацию - второй тест обнаруживает старую защелку, даже если она ведет обратный отсчет в контексте обработчика.
Есть идеи?
package reactornettytest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
public class ReactorNettyBugTest {
private DisposableServer mailServer;
private volatile CountDownLatch latch = null;
private Mono<ServerResponse> handleIncoming(String body) {
countdown();
return ServerResponse.ok().build();
}
private void countdown() {
System.out.println("countdown " + System.identityHashCode(this));
latch.countDown();
}
private DisposableServer createMailServerBuggy() {
RouterFunction<?> route = RouterFunctions.route()
.POST("/test",
accept(MediaType.APPLICATION_JSON),
request -> {
//request.bodyToMono(String.class)
// .flatMap(this::handleIncoming)
countdown();
return ServerResponse.ok().build();
}
)
.onError(t -> true, (t, r) -> ServerResponse.status(500).build())
.build();
ReactorHttpHandlerAdapter adapter =
new ReactorHttpHandlerAdapter(RouterFunctions.toHttpHandler(route));
HttpServer server = HttpServer.create().host("localhost").port(8082);
return server.handle(adapter).bind().block();
}
static final byte[] emptyByteArray = new byte[0];
private DisposableServer createMailServer() {
HttpServer server = HttpServer
.create()
.host("localhost")
.port(8082)
.route(r -> r.post("/test", (req, res) -> {
Mono<?> dummy = req
.receive()
.aggregate()
.asString()
.map(body -> handleIncoming(body));
//case 1
//countdown();
//return res.status(200);
//case 2
res.status(200);
return res.sendByteArray(dummy.map(d -> emptyByteArray));
}));
return server.bind().block();
}
@Before
public void setUp() {
latch = new CountDownLatch(1);
mailServer = createMailServer();
}
@After
public void tearDown() {
if (mailServer != null) {
mailServer.disposeNow();
mailServer = null;
}
}
@Test
public void test() {
WebClient.create()
.post()
.uri("http://localhost:8082/test")
.accept(MediaType.APPLICATION_JSON)
.bodyValue("{}")
.retrieve()
.toBodilessEntity()
.timeout(Duration.ofMinutes(1))
.flatMap(body -> {
try {
System.out.println("check " + System.identityHashCode(this));
latch.await(10, TimeUnit.SECONDS);
Assert.assertEquals("latch isn't ok", 0, latch.getCount());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return Mono.just(true);
}).block();
}
@Test
public void test2() {
test();
}
}