Ниже приведен мой код TCPServer
public class TCPServer {
private final int port;
public TCPServer(final int port) {
this.port = port;
}
public void run() throws Exception {
final ExecutorService threadPool = Executors.newCachedThreadPool();
final EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
final EventLoopGroup workerGroup = new NioEventLoopGroup(50, threadPool);
try {
final ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(final SocketChannel ch) throws Exception {
//ch.pipeline().addLast(new ServerRequestHandler());
ch.pipeline().addLast(new ReqMessageDecoder(1024, 0, 2, 0, 2), new ServerRequestHandler());
}
}).option(ChannelOption.SO_BACKLOG, 1000) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
final ChannelFuture f = b.bind(this.port).sync(); // (7) // Start
// the server
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to
// gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
threadPool.shutdown();
}
}
public static void main(final String[] args) throws Exception {
int port = 9090;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new TCPServer(port).run();
}
}
MyRequestHandler Class
public class ServerRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) { // (2)
log(" ServerRequestHandler reading message :: " + Thread.currentThread().getName());
ctx.write(msg);
}
}
Мой класс декодера
public class ReqMessageDecoder extends LengthFieldBasedFrameDecoder {
public ReqMessageDecoder(final int maxFrameLength, final int lengthFieldOffset, final int lengthFieldLength,
final int lengthAdjustment, final int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
Ниже приведен пример кода клиента TCP, который отправляетПример сообщения на указанном сервере в цикле 79 раз в последовательном порядке.
public static void main(final String args[]) {
int i = 80;
final String message = "This is a sample message but we can send actual";
final TestClass testClass = new TestClass();
testClass.startConnection("localhost", 9090);
while (i > 1) {
i--;
testClass.sendMessageLength(testClass.getMessageLength(message).getBytes());
testClass.sendMessage(message);
}
testClass.stopConnection();
}
private Socket clientSocket;
private PrintWriter out;
private BufferedReader in;
private OutputStream outputStream;
public String getMessageLength(final String message) {
final int firstByte = message.length() >> 8;
final int secondByte = message.length() - (firstByte << 8);
System.out.println("char 0 " + (char) firstByte + " char 1 " + (char) secondByte);
final String str = new String(new char[] { (char) firstByte, (char) secondByte });
System.out.println("firstByte :: " + firstByte + " secondByte :: " + secondByte + "Str :: " + str);
return str;
}
public void startConnection(final String ip, final int port) {
try {
this.clientSocket = new Socket(ip, port);
this.outputStream = this.clientSocket.getOutputStream();
this.out = new PrintWriter(this.outputStream, true);
this.in = new BufferedReader(new InputStreamReader(this.clientSocket.getInputStream()));
} catch (final UnknownHostException e) {
e.printStackTrace();
} catch (final IOException e) {
e.printStackTrace();
}
}
public String sendMessage(final String msg) {
this.out.println(msg);
String resp = null;
try {
resp = this.in.readLine();
System.out.println("resp :: " + resp);
} catch (final IOException e) {
e.printStackTrace();
}
return resp;
}
public void sendMessageLength(final byte[] msg) {
try {
this.outputStream.write(msg);
} catch (final IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
В моем клиенте я сначала отправляю длину сообщения в 2 байта (см. getMessageLength
), а затем фактическуюсообщение (см. sendMessage
) в потоке вывода сокета.Теперь, когда сервер получает сообщение, оно печатает ниже.
Вывод:
channel Active
ServerRequestHandler reading message :: pool-1-thread-1 message recieved :: This is a sample message but we can send actualmessage ended
exception caught
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 2562 - discarded
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:522)
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:500)
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.exceededFrameLength(LengthFieldBasedFrameDecoder.java:387)
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:430)
at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
channel read complete
channel inactive
В моем коде я не отправляю данные размером более 50 байтов, но не уверен, почему TCPserver отказывает, чторазмер кадра больше 1024. Я подозреваю, что расчет длины работает не так, как ожидалось.
Я новичок в Netty и у меня тоже есть несколько дополнительных вопросов.
channelRead
метод вызывается в отдельном потоке для каждого полученного сообщения? - Вызывается ли метод декодирования в блокирующем потоке, где он читает (n) байтов из потока TCP / IP блокирующим образом и передаетконтроль для requestHandler, чтобы сделать
channelRead
? - Как принимать только те соединения, которые созданы с использованием защищенного сокета на стороне сервера?
- Я хотел бы обрабатывать сообщения, поступающие от одного клиентав параллельных потоках, чтобы увеличить пропускную способность моего сервера ... я должен сделать что-нибудь дополнительное (порождение потоков в методе
channelRead
?) илиNioEventLoopGroup поддерживает это из коробки?