Netty TCPServer не читает точный кадр из канала, используя декодер - PullRequest
0 голосов
/ 10 декабря 2018

Ниже приведен мой код TCPServer

public class TCPServer {

private final int port;

public TCPServer(final int port) {
    this.port = port;
}

public void run() throws Exception {
    final ExecutorService threadPool = Executors.newCachedThreadPool();
    final EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
    final EventLoopGroup workerGroup = new NioEventLoopGroup(50, threadPool);
    try {
        final ServerBootstrap b = new ServerBootstrap(); // (2)

        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
                .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                    @Override
                    public void initChannel(final SocketChannel ch) throws Exception {
                         //ch.pipeline().addLast(new ServerRequestHandler());
                        ch.pipeline().addLast(new ReqMessageDecoder(1024, 0, 2, 0, 2), new ServerRequestHandler());
                    }
                }).option(ChannelOption.SO_BACKLOG, 1000) // (5)
                .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

        // Bind and start to accept incoming connections.
        final ChannelFuture f = b.bind(this.port).sync(); // (7) // Start
                                                          // the server

        // Wait until the server socket is closed.
        // In this example, this does not happen, but you can do that to
        // gracefully
        // shut down your server.
        f.channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
        threadPool.shutdown();
    }
}

public static void main(final String[] args) throws Exception {
    int port = 9090;
    if (args.length > 0) {
        port = Integer.parseInt(args[0]);
    }

    new TCPServer(port).run();
}
}

MyRequestHandler Class

public class ServerRequestHandler extends ChannelInboundHandlerAdapter {


@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) { // (2)

    log(" ServerRequestHandler reading message :: " + Thread.currentThread().getName());

    ctx.write(msg);

}
}

Мой класс декодера

public class ReqMessageDecoder extends LengthFieldBasedFrameDecoder {

public ReqMessageDecoder(final int maxFrameLength, final int lengthFieldOffset, final int lengthFieldLength,
            final int lengthAdjustment, final int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }

}

Ниже приведен пример кода клиента TCP, который отправляетПример сообщения на указанном сервере в цикле 79 раз в последовательном порядке.

public static void main(final String args[]) {
        int i = 80;
        final String message = "This is a sample message but we can send actual";
        final TestClass testClass = new TestClass();
        testClass.startConnection("localhost", 9090);
        while (i > 1) {
            i--;
            testClass.sendMessageLength(testClass.getMessageLength(message).getBytes());
            testClass.sendMessage(message);

        }
        testClass.stopConnection();
    }

private Socket clientSocket;
private PrintWriter out;
private BufferedReader in;
private OutputStream outputStream;

public String getMessageLength(final String message) {
    final int firstByte = message.length() >> 8;
    final int secondByte = message.length() - (firstByte << 8);
    System.out.println("char 0 " + (char) firstByte + " char 1 " + (char) secondByte);
    final String str = new String(new char[] { (char) firstByte, (char) secondByte });
    System.out.println("firstByte :: " + firstByte + " secondByte :: " + secondByte + "Str :: " + str);
    return str;
}

public void startConnection(final String ip, final int port) {
    try {
        this.clientSocket = new Socket(ip, port);
this.outputStream = this.clientSocket.getOutputStream();
        this.out = new PrintWriter(this.outputStream, true);
        this.in = new BufferedReader(new InputStreamReader(this.clientSocket.getInputStream()));
    } catch (final UnknownHostException e) {
        e.printStackTrace();
    } catch (final IOException e) {
        e.printStackTrace();
    }

}

public String sendMessage(final String msg) {
    this.out.println(msg);
    String resp = null;
    try {
        resp = this.in.readLine();
        System.out.println("resp :: " + resp);
    } catch (final IOException e) {
        e.printStackTrace();
    }
    return resp;
}

 public void sendMessageLength(final byte[] msg) {
    try {
        this.outputStream.write(msg);
    } catch (final IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

}

В моем клиенте я сначала отправляю длину сообщения в 2 байта (см. getMessageLength), а затем фактическуюсообщение (см. sendMessage) в потоке вывода сокета.Теперь, когда сервер получает сообщение, оно печатает ниже.

Вывод:

    channel Active
 ServerRequestHandler reading message :: pool-1-thread-1 message recieved :: This is a sample message but we can send actualmessage ended
exception caught
io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 2562 - discarded
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:522)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:500)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.exceededFrameLength(LengthFieldBasedFrameDecoder.java:387)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:430)
    at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
channel read complete
channel inactive

В моем коде я не отправляю данные размером более 50 байтов, но не уверен, почему TCPserver отказывает, чторазмер кадра больше 1024. Я подозреваю, что расчет длины работает не так, как ожидалось.

Я новичок в Netty и у меня тоже есть несколько дополнительных вопросов.

  1. channelRead метод вызывается в отдельном потоке для каждого полученного сообщения?
  2. Вызывается ли метод декодирования в блокирующем потоке, где он читает (n) байтов из потока TCP / IP блокирующим образом и передаетконтроль для requestHandler, чтобы сделать channelRead?
  3. Как принимать только те соединения, которые созданы с использованием защищенного сокета на стороне сервера?
  4. Я хотел бы обрабатывать сообщения, поступающие от одного клиентав параллельных потоках, чтобы увеличить пропускную способность моего сервера ... я должен сделать что-нибудь дополнительное (порождение потоков в методе channelRead?) илиNioEventLoopGroup поддерживает это из коробки?

Ответы [ 2 ]

0 голосов
/ 03 января 2019

Таким образом, после стольких проб и ошибок и прохождения различных ссылок, я решил этот вопрос, и мой PoC не завершен.

new ReqLengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2, -2, 2) 
// This will read 1st 2 bytes as length and will skip those to read the next 
// bytes (i.e. length mentioned in 1st 2 bytes)

Вызывается ли метод channelRead в отдельном потоке для каждого сообщенияон получает?

Ответ: Для этого вам нужно сделать что-то, как показано ниже.

pipeline.addLast(this.threadPool, new ServerRequestHandler());
// this.threadPool refers to UnorderedThreadPoolEventExecutor 
// ServerRequestHandler extends SimpleChannelInboundHandler<T>

см. Netty 4. Параллельная обработка после ByteToMessageCodec

Вызывается ли метод decode в блокирующем потоке, где он считывает (n) байтов из потока TCP / IP в режиме блокировки и передает управление requestHandler для выполнения channelRead?

Ответ: Да, но эти потокиотличаются от потоков ввода / вывода.Эти потоки работают на конвейере канала.

Как принимать только те соединения, которые созданы с использованием защищенного сокета на стороне сервера?

Ответ: см. Ниже базовую настройку

// some dummy SSL setup. actual production code will have some more
// sophisticated steps.
final SelfSignedCertificate ssc = new SelfSignedCertificate();
SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
pipeline.addLast(sslCtx.newHandler(ch.alloc()));

Я хотел бы обрабатывать сообщения, поступающие от одного клиента в параллельных потоках, чтобы увеличить пропускную способность моего сервера ... Должен ли я делать что-то дополнительное (порождение потоков в методе channelRead?) Или NioEventLoopGroup поддерживает это из коробки?

Ответ: Пожалуйста, обратитесь к Многопоточность Netty для каждого соединения & Netty 4. Параллельная обработка после ByteToMessageCodec

0 голосов
/ 11 декабря 2018
ch.pipeline().addLast(new ReqMessageDecoder(1024, 0, 2, 0, 2), new ServerRequestHandler());

Ваш ReqMessageDecoder ожидает, что максимальная длина фрейма будет 1024, и все, что может вызвать исключение.Как вы определяете границы своих фреймов?граница кадра сообщит вашему декодеру, что полный кадр получен, и он может быть обработан вашим кодом приложения, в противном случае декодер будет продолжать считывать входящие данные, пока не достигнет максимального размера кадра (например, 1024) или символа / последовательности границы кадраполучено.

См. Следующую ссылку для встроенного декодера Netty в конце строки, который вы можете добавить к своему каналу DelimiterBasedFrameDecoder.

...