Ошибка Reactor Netty - обработчик маршрутизации фиксирует неверный контекст? - PullRequest
0 голосов
/ 12 января 2020

Я только что создал простой тест:

  1. Запустите http-сервер
  2. отправьте ему запрос внутри теста
  3. проверьте, был ли он получен и обработан

Проблема -

  • Если я запускаю только один тест или обратный отсчет в контексте обработчика (случай 1) - все работает. Если я запускаю два теста и обратный отсчет не в контексте обработчика (случай 2) - второй тест прерывается. Выходные данные показывают, что второй тест наблюдает защелку, созданную для первого теста (System.identityHashCode указывает, что).
  • Кроме того, создание сервера с помощью RouterFunctions.toHttpHandler (createMailServerBuggy) усугубляет ситуацию - второй тест обнаруживает старую защелку, даже если она ведет обратный отсчет в контексте обработчика.

Есть идеи?

package reactornettytest;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.springframework.web.reactive.function.server.RequestPredicates.accept;

public class ReactorNettyBugTest {

    private DisposableServer mailServer;
    private volatile CountDownLatch latch = null;

    private Mono<ServerResponse> handleIncoming(String body) {
        countdown();
        return ServerResponse.ok().build();
    }

    private void countdown() {
        System.out.println("countdown " + System.identityHashCode(this));
        latch.countDown();
    }

    private DisposableServer createMailServerBuggy() {
        RouterFunction<?> route = RouterFunctions.route()
                .POST("/test",
                        accept(MediaType.APPLICATION_JSON),
                        request -> {
                            //request.bodyToMono(String.class)
                            //        .flatMap(this::handleIncoming)
                            countdown();
                            return ServerResponse.ok().build();
                        }
                )
                .onError(t -> true, (t, r) -> ServerResponse.status(500).build())
                .build();

        ReactorHttpHandlerAdapter adapter =
                new ReactorHttpHandlerAdapter(RouterFunctions.toHttpHandler(route));
        HttpServer server = HttpServer.create().host("localhost").port(8082);
        return server.handle(adapter).bind().block();
    }

    static final byte[] emptyByteArray = new byte[0];
    private DisposableServer createMailServer() {

        HttpServer server = HttpServer
                .create()
                .host("localhost")
                .port(8082)
                .route(r -> r.post("/test", (req, res) -> {
                    Mono<?> dummy = req
                            .receive()
                            .aggregate()
                            .asString()
                            .map(body -> handleIncoming(body));

                    //case 1
                    //countdown();
                    //return res.status(200);

                    //case 2
                    res.status(200);
                    return res.sendByteArray(dummy.map(d -> emptyByteArray));
                }));
        return server.bind().block();
    }

    @Before
    public void setUp() {
        latch = new CountDownLatch(1);
        mailServer = createMailServer();
    }

    @After
    public void tearDown() {
        if (mailServer != null) {
            mailServer.disposeNow();
            mailServer = null;
        }
    }

    @Test
    public void test() {
        WebClient.create()
                .post()
                .uri("http://localhost:8082/test")
                .accept(MediaType.APPLICATION_JSON)
                .bodyValue("{}")
                .retrieve()
                .toBodilessEntity()
                .timeout(Duration.ofMinutes(1))
                .flatMap(body -> {
                    try {
                        System.out.println("check " + System.identityHashCode(this));
                        latch.await(10, TimeUnit.SECONDS);
                        Assert.assertEquals("latch isn't ok", 0, latch.getCount());
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return Mono.just(true);
                }).block();
    }

    @Test
    public void test2() {
        test();
    }
}

...