Как заставить ответ Undertow сбрасывать мульти-данные клиенту по протоколу Http / 2? - PullRequest
0 голосов
/ 05 декабря 2018

Вот мой код сервера :

@PostMapping("stream")
    public Mono<Void> handleStream(ServerHttpRequest request, ServerHttpResponse response) {

        Flux<DataBuffer> body = request.getBody();
        return response.writeAndFlushWith(Flux.just(
                body.map(buffer -> {
                    ByteBuffer copy = buffer.asByteBuffer().duplicate();
                    DataBufferUtils.release(buffer);
                    byte[] data = new byte[copy.remaining()];
                    copy.get(data);
                    return new String(data);
                }).map(string -> {
                    DataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
                    dataBuffer.write(string.getBytes());
                    return dataBuffer;
                }))
        );
    }

Вот моя конфигурация :

server:
    port: 8000
    http2:
        enabled: true
    compression:
        enabled: true
    ssl:
        key-store: classpath:sert.jks
        key-password: ......
        key-store-password: ......
    undertow:
        io-threads: 100
        worker-threads: 100

Мой тестовый класс:

public class StreamHandlerTester {

    private static SSLContext sslContext;
    private static OkHttpClient httpClient;
    private static Integer TIME_OUT = 8000;
    private static SSLSocketFactory sslSocketFactory;
    private static final HostnameVerifier verifiedAllHostname = (hostname, session) -> true;
    private static final TrustManager[] trustAllCerts = new TrustManager[]{
            new X509TrustManager() {
                @Override
                public void checkClientTrusted(X509Certificate[] chain, String authType) {

                }

                @Override
                public void checkServerTrusted(X509Certificate[] chain, String authType) {

                }

                @Override
                public X509Certificate[] getAcceptedIssuers() {

                    return new X509Certificate[]{};
                }
            }
    };

    static {

        try {
            sslContext = SSLContext.getInstance("TLS");
            sslContext.init(null, trustAllCerts, new SecureRandom());
        } catch (Exception e) {

        }
        sslSocketFactory = sslContext.getSocketFactory();

        OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();

        BiConsumer<Buffer, Long> callBack = (sink, byteCount) -> {
            try {
                while (sink.size() > 0) {
                    byte[] data = sink.readByteArray();
                    log.info("response : {}", new String(data));
                }
            } catch (Exception e) {
                log.error("Read response failed. error msg: ", e);
            }
        };
        clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1))
                .sslSocketFactory(sslSocketFactory, (X509TrustManager) trustAllCerts[0])
                .hostnameVerifier(verifiedAllHostname)
                .connectTimeout(TIME_OUT, TimeUnit.MILLISECONDS)
                .readTimeout(TIME_OUT, TimeUnit.MILLISECONDS)
                .writeTimeout(TIME_OUT, TimeUnit.MILLISECONDS)
                .retryOnConnectionFailure(true)
                .addNetworkInterceptor(chain -> {
                    Response response = chain.proceed(chain.request());
                    return response.newBuilder().body(new ResponseDTO(response.body(), callBack)).build();
                });
        httpClient = clientBuilder.build();

    }

    public static void main(String[] args) {

        List<String> lists = new ArrayList<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
        RequestBody body = new RequestBody() {
            @Override
            public MediaType contentType() {

                return null;
            }

            @Override
            public void writeTo(BufferedSink bufferedSink) {

                lists.stream().forEach(data -> {
                    try {
                        bufferedSink.write(data.getBytes());
                        bufferedSink.flush();
                        log.info("发送一次数据: {}", data);
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });

            }
        };

        Request.Builder builder = new Request.Builder();
        Request request = builder.url("http://localhost:8000/stream").post(body).headers(buildUpHeader()).build();

        long startTime = System.currentTimeMillis();
        httpClient.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

                log.info("收到结果, 耗时:{}", System.currentTimeMillis() - startTime);
                log.info("收到数据: {}", response.body().string());
            }
        });
    }

    private static Headers buildUpHeader() {

        Map<String, String> headers = new HashMap<>();
        headers.put("Transfer-Encoding", "chunked");
        headers.put("Content-Type", "application/octet-stream");
        headers.put("Accept-Encoding", "gzip,deflate");
        return buildHeader(headers);
    }

    private static Headers buildHeader(Map<String, String> headers) {

        Headers.Builder headerBiulder = new Headers.Builder();
        for (Map.Entry<String, String> entry : headers.entrySet()) {
            headerBiulder.add(entry.getKey(), entry.getValue());
        }
        return headerBiulder.build();
    }
}

, когда я начинаю отправлять запрос, результат выглядит так:

20:10:41.722 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 1
20:10:42.235 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 2
20:10:42.736 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 3
20:10:43.239 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 4
20:10:43.739 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 5
20:10:44.240 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 6
20:10:44.741 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 7
20:10:45.241 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 8
20:10:45.742 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 9
20:10:46.243 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 10
20:10:46.746 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 收到结果, 耗时:5095
20:10:46.747 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 1
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 2
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 3
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 4
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 5
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 6
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 7
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 8
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 9
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 10
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 收到数据: 

Process finished with exit code 0

И журнал моего сервера выглядит так:

2018-12-05 20:10:41.986  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 1
2018-12-05 20:10:42.235  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 2
2018-12-05 20:10:42.736  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 3
2018-12-05 20:10:43.237  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 4
2018-12-05 20:10:43.739  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 5
2018-12-05 20:10:44.240  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 6
2018-12-05 20:10:44.741  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 7
2018-12-05 20:10:45.241  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 8
2018-12-05 20:10:45.742  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 9
2018-12-05 20:10:46.243  INFO 4696 --- [  XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler   : 处理数据: 10

AsВы можете видеть из отметки времени в журнале сервера, с точки зрения формы запроса, результаты на стороне сервера, по-видимому, блокируются до получения всех данных запроса, а затем сбрасывают их все на сторону запроса.

Есть ли способобрабатывать данные, а затем отправить их немедленно на стороне запроса?Какие-нибудь советы?Кстати, Undertow является контейнером сервера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...