Вот мой код сервера :
@PostMapping("stream")
public Mono<Void> handleStream(ServerHttpRequest request, ServerHttpResponse response) {
Flux<DataBuffer> body = request.getBody();
return response.writeAndFlushWith(Flux.just(
body.map(buffer -> {
ByteBuffer copy = buffer.asByteBuffer().duplicate();
DataBufferUtils.release(buffer);
byte[] data = new byte[copy.remaining()];
copy.get(data);
return new String(data);
}).map(string -> {
DataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
dataBuffer.write(string.getBytes());
return dataBuffer;
}))
);
}
Вот моя конфигурация :
server:
port: 8000
http2:
enabled: true
compression:
enabled: true
ssl:
key-store: classpath:sert.jks
key-password: ......
key-store-password: ......
undertow:
io-threads: 100
worker-threads: 100
Мой тестовый класс:
public class StreamHandlerTester {
private static SSLContext sslContext;
private static OkHttpClient httpClient;
private static Integer TIME_OUT = 8000;
private static SSLSocketFactory sslSocketFactory;
private static final HostnameVerifier verifiedAllHostname = (hostname, session) -> true;
private static final TrustManager[] trustAllCerts = new TrustManager[]{
new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[]{};
}
}
};
static {
try {
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustAllCerts, new SecureRandom());
} catch (Exception e) {
}
sslSocketFactory = sslContext.getSocketFactory();
OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
BiConsumer<Buffer, Long> callBack = (sink, byteCount) -> {
try {
while (sink.size() > 0) {
byte[] data = sink.readByteArray();
log.info("response : {}", new String(data));
}
} catch (Exception e) {
log.error("Read response failed. error msg: ", e);
}
};
clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1))
.sslSocketFactory(sslSocketFactory, (X509TrustManager) trustAllCerts[0])
.hostnameVerifier(verifiedAllHostname)
.connectTimeout(TIME_OUT, TimeUnit.MILLISECONDS)
.readTimeout(TIME_OUT, TimeUnit.MILLISECONDS)
.writeTimeout(TIME_OUT, TimeUnit.MILLISECONDS)
.retryOnConnectionFailure(true)
.addNetworkInterceptor(chain -> {
Response response = chain.proceed(chain.request());
return response.newBuilder().body(new ResponseDTO(response.body(), callBack)).build();
});
httpClient = clientBuilder.build();
}
public static void main(String[] args) {
List<String> lists = new ArrayList<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
RequestBody body = new RequestBody() {
@Override
public MediaType contentType() {
return null;
}
@Override
public void writeTo(BufferedSink bufferedSink) {
lists.stream().forEach(data -> {
try {
bufferedSink.write(data.getBytes());
bufferedSink.flush();
log.info("发送一次数据: {}", data);
TimeUnit.MILLISECONDS.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
});
}
};
Request.Builder builder = new Request.Builder();
Request request = builder.url("http://localhost:8000/stream").post(body).headers(buildUpHeader()).build();
long startTime = System.currentTimeMillis();
httpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
log.info("收到结果, 耗时:{}", System.currentTimeMillis() - startTime);
log.info("收到数据: {}", response.body().string());
}
});
}
private static Headers buildUpHeader() {
Map<String, String> headers = new HashMap<>();
headers.put("Transfer-Encoding", "chunked");
headers.put("Content-Type", "application/octet-stream");
headers.put("Accept-Encoding", "gzip,deflate");
return buildHeader(headers);
}
private static Headers buildHeader(Map<String, String> headers) {
Headers.Builder headerBiulder = new Headers.Builder();
for (Map.Entry<String, String> entry : headers.entrySet()) {
headerBiulder.add(entry.getKey(), entry.getValue());
}
return headerBiulder.build();
}
}
, когда я начинаю отправлять запрос, результат выглядит так:
20:10:41.722 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 1
20:10:42.235 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 2
20:10:42.736 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 3
20:10:43.239 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 4
20:10:43.739 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 5
20:10:44.240 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 6
20:10:44.741 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 7
20:10:45.241 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 8
20:10:45.742 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 9
20:10:46.243 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 发送一次数据: 10
20:10:46.746 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 收到结果, 耗时:5095
20:10:46.747 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 1
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 2
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 3
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 4
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 5
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 6
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 7
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 8
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 9
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - response : 10
20:10:46.748 [OkHttp http://localhost:8000/...] INFO com.itachy.webflux.test.StreamHandlerTester - 收到数据:
Process finished with exit code 0
И журнал моего сервера выглядит так:
2018-12-05 20:10:41.986 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 1
2018-12-05 20:10:42.235 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 2
2018-12-05 20:10:42.736 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 3
2018-12-05 20:10:43.237 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 4
2018-12-05 20:10:43.739 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 5
2018-12-05 20:10:44.240 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 6
2018-12-05 20:10:44.741 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 7
2018-12-05 20:10:45.241 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 8
2018-12-05 20:10:45.742 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 9
2018-12-05 20:10:46.243 INFO 4696 --- [ XNIO-1 I/O-11] c.itachy.webflux.handler.StreamHandler : 处理数据: 10
AsВы можете видеть из отметки времени в журнале сервера, с точки зрения формы запроса, результаты на стороне сервера, по-видимому, блокируются до получения всех данных запроса, а затем сбрасывают их все на сторону запроса.
Есть ли способобрабатывать данные, а затем отправить их немедленно на стороне запроса?Какие-нибудь советы?Кстати, Undertow является контейнером сервера.