TL; DR:
Если вы попадаете на том, необходимый для того, чтобы GC столкнулся с проблемой с обработчиками HTTP по умолчанию, пора все равно масштабировать прокси-сервер.
После ответа Нормана я в конце концов попытался сделать очень доступный для совместного использования HTTP кодек / POC-агрегатор POC, чтобы выяснить, было ли это что-то преследовать или нет.
Мой совместимый декодер был далек от RFC 7230 , но он дал мне достаточно запросов для моего текущего проекта.
Затем я использовал httperf и visualvm , чтобы получить представление о GCразница в нагрузкеЗа мои усилия у меня было только 10% снижение скорости GC. Другими словами, это действительно не имеет большого значения.
Единственный реальный эффект был в том, что у меня было на 5% меньше ошибок при выполнении 1000 запросов в секунду по сравнению с использованием упакованного неразделенного кодека HTTP+ агрегатор против моего разделяемого. И это произошло только тогда, когда я выполнял 1000 рэк / сек в течение более 10 секунд.
В конце концов я не собираюсь заниматься этим. Время, необходимое для превращения этого полностью в HTTP-совместимый декодер, для крошечной выгоды, которую можно решить с помощью прокси-сервера, совсем не стоит времени.
Для справочных целей здесь используется комбинированный декодер с совместным использованием. / агрегатор, который я пробовал:
import java.util.concurrent.ConcurrentHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
@Sharable
public class SharableHttpDecoder extends ChannelInboundHandlerAdapter {
private static final ConcurrentHashMap<ChannelId, SharableHttpRequest> MAP =
new ConcurrentHashMap<ChannelId, SharableHttpRequest>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception
{
if (msg instanceof ByteBuf)
{
ByteBuf buf = (ByteBuf) msg;
ChannelId channelId = ctx.channel().id();
SharableHttpRequest request = MAP.get(channelId);
if (request == null)
{
request = new SharableHttpRequest(buf);
buf.release();
if (request.isComplete())
{
ctx.fireChannelRead(request);
}
else
{
MAP.put(channelId, request);
}
}
else
{
request.append(buf);
buf.release();
if (request.isComplete())
{
ctx.fireChannelRead(request);
}
}
}
else
{
// TODO send 501
System.out.println("WTF is this? " + msg.getClass().getName());
ctx.fireChannelRead(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception
{
System.out.println("Unable to handle request on channel: " +
ctx.channel().id().asLongText());
cause.printStackTrace(System.err);
// TODO send 500
ctx.fireExceptionCaught(cause);
ctx.close();
}
}
Результирующий объект, созданный декодером для обработки на конвейере:
import java.util.Arrays;
import java.util.HashMap;
import io.netty.buffer.ByteBuf;
public class SharableHttpRequest
{
private static final byte SPACE = 32;
private static final byte COLON = 58;
private static final byte CARRAIGE_RETURN = 13;
private HashMap<Header,String> myHeaders;
private Method myMethod;
private String myPath;
private byte[] myBody;
private int myIndex = 0;
public SharableHttpRequest(ByteBuf buf)
{
try
{
myHeaders = new HashMap<Header,String>();
final StringBuilder builder = new StringBuilder(8);
parseRequestLine(buf, builder);
while (parseNextHeader(buf, builder));
parseBody(buf);
}
catch (Exception e)
{
e.printStackTrace(System.err);
}
}
public String getHeader(Header name)
{
return myHeaders.get(name);
}
public Method getMethod()
{
return myMethod;
}
public String getPath()
{
return myPath;
}
public byte[] getBody()
{
return myBody;
}
public boolean isComplete()
{
return myIndex >= myBody.length;
}
public void append(ByteBuf buf)
{
int length = buf.readableBytes();
buf.getBytes(buf.readerIndex(), myBody, myIndex, length);
myIndex += length;
}
private void parseRequestLine(ByteBuf buf, StringBuilder builder)
{
int idx = buf.readerIndex();
int end = buf.writerIndex();
for (; idx < end; ++idx)
{
byte next = buf.getByte(idx);
// break on CR
if (next == CARRAIGE_RETURN)
{
break;
}
// we need the method
else if (myMethod == null)
{
if (next == SPACE)
{
myMethod = Method.fromBuilder(builder);
builder.delete(0, builder.length());
builder.ensureCapacity(100);
}
else
{
builder.append((char) next);
}
}
// we need the path
else if (myPath == null)
{
if (next == SPACE)
{
myPath = builder.toString();
builder.delete(0, builder.length());
}
else
{
builder.append((char) next);
}
}
// don't need the version right now
}
idx += 2; // skip line endings
buf.readerIndex(idx);
}
private boolean parseNextHeader(ByteBuf buf, StringBuilder builder)
{
Header header = null;
int idx = buf.readerIndex();
int end = buf.writerIndex();
for (; idx < end; ++idx)
{
byte next = buf.getByte(idx);
// break on CR
if (next == CARRAIGE_RETURN)
{
if (header != Header.UNHANDLED)
{
myHeaders.put(header,builder.toString());
builder.delete(0, builder.length());
}
break;
}
else if (header == null)
{
// we have the full header name
if (next == COLON)
{
header = Header.fromBuilder(builder);
builder.delete(0, builder.length());
}
// get header name as lower case for mapping purposes
else
{
builder.append(next > 64 && next < 91 ?
(char) ( next | 32 ) : (char) next);
}
}
// we don't care about some headers
else if (header == Header.UNHANDLED)
{
continue;
}
// skip initial spaces
else if (builder.length() == 0 && next == SPACE)
{
continue;
}
// get the header value
else
{
builder.append((char) next);
}
}
idx += 2; // skip line endings
buf.readerIndex(idx);
if (buf.getByte(idx) == CARRAIGE_RETURN)
{
idx += 2; // skip line endings
buf.readerIndex(idx);
return false;
}
else
{
return true;
}
}
private void parseBody(ByteBuf buf)
{
int length = buf.readableBytes();
if (length == 0)
{
myBody = new byte[0];
myIndex = 1;
}
else
{
System.out.println("Content-Length: " + myHeaders.get(Header.CONTENT_LENGTH));
if (myHeaders.get(Header.CONTENT_LENGTH) != null)
{
int totalLength = Integer.valueOf(myHeaders.get(Header.CONTENT_LENGTH));
myBody = new byte[totalLength];
buf.getBytes(buf.readerIndex(), myBody, myIndex, length);
myIndex += length;
}
// TODO handle chunked
}
}
public enum Method
{
GET(new char[]{71, 69, 84}),
POST(new char[]{80, 79, 83, 84}),
UNHANDLED(new char[]{}); // could be expanded if needed
private char[] chars;
Method(char[] chars)
{
this.chars = chars;
}
public static Method fromBuilder(StringBuilder builder)
{
for (Method method : Method.values())
{
if (method.chars.length == builder.length())
{
boolean match = true;
for (int i = 0; i < builder.length(); i++)
{
if (method.chars[i] != builder.charAt(i))
{
match = false;
break;
}
}
if (match)
{
return method;
}
}
}
return null;
}
}
public enum Header
{
HOST(new char[]{104, 111, 115, 116}),
CONNECTION(new char[]{99, 111, 110, 110, 101, 99, 116, 105, 111, 110}),
IF_MODIFIED_SINCE(new char[]{
105, 102, 45, 109, 111, 100, 105, 102, 105, 101, 100, 45, 115,
105, 110, 99, 101}),
COOKIE(new char[]{99, 111, 111, 107, 105, 101}),
CONTENT_LENGTH(new char[]{
99, 111, 110, 116, 101, 110, 116, 45, 108, 101, 110, 103, 116, 104}),
UNHANDLED(new char[]{}); // could be expanded if needed
private char[] chars;
Header(char[] chars)
{
this.chars = chars;
}
public static Header fromBuilder(StringBuilder builder)
{
for (Header header : Header.values())
{
if (header.chars.length == builder.length())
{
boolean match = true;
for (int i = 0; i < builder.length(); i++)
{
if (header.chars[i] != builder.charAt(i))
{
match = false;
break;
}
}
if (match)
{
return header;
}
}
}
return UNHANDLED;
}
}
}
Простой обработчик для тестирования:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
@Sharable
public class SharableHttpHandler extends SimpleChannelInboundHandler<SharableHttpRequest>
{
@Override
protected void channelRead0(ChannelHandlerContext ctx, SharableHttpRequest msg)
throws Exception
{
String message = "HTTP/1.1 200 OK\r\n" +
"Content-type: text/html\r\n" +
"Content-length: 42\r\n\r\n" +
"<html><body>Hello sharedworld</body><html>";
ByteBuf buffer = ctx.alloc().buffer(message.length());
buffer.writeCharSequence(message, CharsetUtil.UTF_8);
ChannelFuture flushPromise = ctx.channel().writeAndFlush(buffer);
flushPromise.addListener(ChannelFutureListener.CLOSE);
if (!flushPromise.isSuccess())
{
flushPromise.cause().printStackTrace(System.err);
}
}
}
Полный конвейер с использованием этих разделяемых обработчиков:
import tests.SharableHttpDecoder;
import tests.SharableHttpHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class ServerPipeline extends ChannelInitializer<SocketChannel>
{
private final SharableHttpDecoder decoder = new SharableHttpDecoder();
private final SharableHttpHandler handler = new SharableHttpHandler();
@Override
public void initChannel(SocketChannel channel)
{
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(decoder);
pipeline.addLast(handler);
}
}
Выше было проверено с этим (более обычным) неразделенным конвейером:
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.CharsetUtil;
public class ServerPipeline extends ChannelInitializer<SocketChannel>
{
@Override
public void initChannel(SocketChannel channel)
{
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new UnsharedHttpHandler());
}
class UnsharedHttpHandler extends SimpleChannelInboundHandler<FullHttpRequest>
{
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
throws Exception
{
String message = "<html><body>Hello sharedworld</body><html>";
ByteBuf buffer = ctx.alloc().buffer(message.length());
buffer.writeCharSequence(message.toString(), CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buffer);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
HttpUtil.setContentLength(response, response.content().readableBytes());
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
ChannelFuture flushPromise = ctx.writeAndFlush(response);
flushPromise.addListener(ChannelFutureListener.CLOSE);
}
}
}