У меня есть сервер Netty, который получает запрос POST от клиента с полезной нагрузкой json, аналогичной:
{
"url": [
"https://www.google.com",
"https://www.yahoo.com",
"https://www.duckduckgo.com"
]
}
Затем сервер Netty отправит запрос GET на каждый отдельный URL-адрес в json "url"массив и отправить клиенту тело ответа каждого из этих запросов.
Следующий код делает именно это.
private void handleJsonRequest(JsonNode requestNode, Channel channel) throws ParseRequestException {
List<AuctionRequest> requestList = parseRequestJson(requestNode, getRequestStartTime(channel));
requestList
.stream()
.map(auctionRequest -> {
log.info("Going to send a request");
CompletableFuture<AuctionRequestResult> response = getHttpClient().performRequest(auctionRequest);
return response;
})
.forEach(future -> {
onRequestComplete(channel, future);
});
}
Но я хочу отправить клиенту каждый ответ от запроса GETотправляется на каждый URL, когда они становятся доступными.Пример:
- Клиент отправляет список URL-адресов (http://www.google.com, http://www.yahoo.com и http://www.duckduckgo.com) на сервер Netty
- Netty отправляетзапрос GET на http://www.google.com, http://www.yahoo.com и на http://www.duckduckgo.com
- Через несколько миллисекунд Нетти получает ответ от GET http://www.google.com, но еще нет из двух других URL-адресов. Итак, теперь я хочу отправить ответ от GET http://www.google.com клиенту и все еще ждать ответа GET от двух других URL-адресов.
- Через одну секунду после Nettyполучить ответ от GET https://www.duckduckgo.com, и я хочу использовать то же соединение, которое использовалось в пункте 2, для отправки клиенту тела ответа от GET https://www.duckduckgo.com
- Через две секунды Нетти получаетполучить ответ от GET https://www.yahoo.com и выполнить то же действие, что и на шаге 3, закрыть канал и завершить ответ клиенту.
На диаграмме ниже показан поток запросов
Я знаю, что это хороший вариант использования для серверной части events или websockets, но пока это невозможно.
Я пытался использовать multi-part
ответ и использовать octet/stream
заголовки, но клиенту отправляется только первый ответ, любые последующие ответы не полученыклиентом.Сейчас я пытаюсь следовать принципу чанкового ответа, следуя этим инструкциям из другого вопроса StackOverflow , но безуспешно.Клиент получает только первый ответ от 3 (или более) запросов, отправленных Netty.
Так я реализовал метод onComplete
, который обрабатывает каждый ответ, полученный Netty
private void onRequestComplete(Channel channel, CompletableFuture<AuctionRequestResult> future) {
future.whenComplete((requestResult, throwable) -> {
if (throwable != null) {
log.debug("Writing error response for request {}", future.hashCode());
sendError(throwable.getMessage(), throwable, channel);
log.debug("Finished writing error response for request {}", future.hashCode());
return;
}
log.info("Writing response for request {}", future.hashCode());
writeRequestResult(requestResult, channel);
log.info("Finished writing response for request {}", future.hashCode());
});
}
private void writeRequestResult(AuctionRequestResult requestResult, Channel channel) {
ByteBuf buf = getByteBufAllocator().directBuffer(512000);
JsonWriter jsonWriter = getJsonWriter();
try {
// Create DTO and serialize it
ProxyResponseDto proxyResponseDto = new ProxyResponseDto("ok", requestResult);
dslJson.serialize(jsonWriter, proxyResponseDto);
// Write response to the client
writeResponse(channel, jsonWriter.getByteBuffer(), jsonWriter.size(), false);
}
catch (Exception e) {
log.warn(e.getMessage());
closeChannel(channel);
}
finally {
buf.release();
jsonWriter.reset();
jsonWriters.offer(jsonWriter);
}
}
private void writeResponse(Channel channel, byte[] responseJsonUtf8, int length) {
String responseJson = new String(responseJsonUtf8);
ByteBuf buf = Unpooled.copiedBuffer(responseJsonUtf8, 0, responseJsonUtf8.length);
FullHttpRequest request = getRequestObject(channel);
log.info("Writing response back in the channel");
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(TRANSFER_ENCODING, CHUNKED);
response.headers().set(HttpHeaderNames.CONNECTION, "keep-alive");
ChunkedInput<ByteBuf> chunkedInput = new ChunkedStream(new ByteBufInputStream(buf));
HttpChunkedInput httpChunkedInput = new HttpChunkedInput(chunkedInput);
// Write the response.
if (channel.isActive()) {
channel.write(response);
ChannelFuture future = channel.writeAndFlush(httpChunkedInput);
future.addListener(listener -> {
if(listener.isSuccess()) {
log.info("Wrote data to channel with success!!!");
}
else {
log.error("Error when writting data to channel");
Throwable cause = listener.cause();
if(cause != null) {
log.error("Error reason: ", cause);
}
}
});
}
else {
log.warn("Exchange connection went dead. Ms from start: {}", (System.currentTimeMillis() - getRequestStartTime(channel)));
closeChannel(channel);
}
}
Это вывод журнала.Как видите, первый ответ успешно отправляется из Netty клиенту, но любая дальнейшая попытка выдает исключение
Calling onComplete
End of calling onComplete
Writing response for request 1897031616
Writing response back in the channel
Finished writing response for request 1897031616
Wrote data to channel with success!!!
Writing response for request -990888115
Writing response back in the channel
Error when writting data to channel
Error reason:
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: io.netty.handler.codec.http.DefaultHttpContent (expected: HttpResponse)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107) ~[netty-codec-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.codec.MessageToMessageCodec.write(MessageToMessageCodec.java:116) ~[netty-codec-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:708) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:791) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:701) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:696) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:276) ~[netty-handler-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:135) ~[netty-handler-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$2100(AbstractChannelHandlerContext.java:56) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1150) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1073) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405) [netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:338) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906) [netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.36.Final.jar:4.1.36.Final]
at java.lang.Thread.run(Thread.java:825) [?:?]
Caused by: java.lang.IllegalStateException: unexpected message type: io.netty.handler.codec.http.DefaultHttpContent (expected: HttpResponse)
at io.netty.handler.codec.http.HttpContentEncoder.ensureHeaders(HttpContentEncoder.java:241) ~[netty-codec-http-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.codec.http.HttpContentEncoder.encode(HttpContentEncoder.java:97) ~[netty-codec-http-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.codec.http.HttpContentEncoder.encode(HttpContentEncoder.java:52) ~[netty-codec-http-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.codec.MessageToMessageCodec$1.encode(MessageToMessageCodec.java:67) ~[netty-codec-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89) ~[netty-codec-4.1.36.Final.jar:4.1.36.Final]
... 20 more
Мой вопрос заключается в том, может ли кто-нибудь указать мне правильное направление при реализации push / streamподходить и отправлять данные с сервера Netty клиенту по мере их поступления без использования веб-сокетов или событий на стороне сервера?