Netty Parallel Handler Processing - PullRequest
       14

Netty Parallel Handler Processing

2 голосов
/ 27 апреля 2019

Следуя рекомендациям в других местах, я пытаюсь распараллелить мой последний входящий обработчик в конвейере Netty как таковом

public final class EchoServer {
    private EventLoopGroup group = new NioEventLoopGroup();
    private UnorderedThreadPoolEventExecutor workers = new UnorderedThreadPoolEventExecutor(10);

    public void start(int port) throws InterruptedException {
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
                    .handler(new ChannelInitializer<NioDatagramChannel>() {
                        @Override
                        protected void initChannel(NioDatagramChannel channel) throws Exception {
                            channel.pipeline().addLast(workers, new SimpleChannelInboundHandler<DatagramPacket>() {
                                @Override
                                public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
                                    System.err.println(packet);
                                    // Simulated database delay that I have to wait to occur before repsonding
                                    Thread.sleep(1000);
                                    ctx.write(new DatagramPacket(Unpooled.copiedBuffer("goodbye", StandardCharsets.ISO_8859_1), packet.sender()));
                                }

                                @Override
                                public void channelReadComplete(ChannelHandlerContext ctx) {
                                    ctx.flush();
                                }

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                    cause.printStackTrace();
                                }
                            });
                        }
                    });

            b.bind(port).sync().channel().closeFuture().await();
        } finally {
            group.shutdownGracefully();
        }
    }

    public void stop() {
        group.shutdownGracefully();
    }
}

У меня есть десять клиентов, которые одновременно подключаются в качестве теста, и я измеряю время выполнения для обработки всех запросов. Как и ожидалось с задержкой в ​​1 секунду и последовательным выполнением, это займет чуть более 10 секунд. Я пытаюсь сократить время выполнения до 2 секунд, чтобы доказать параллельную обработку.

Из того, что я понимаю, добавление обработчика в конвейер с явно назначенным исполнителем предполагает параллелизацию работы обработчиков в потоке в исполнителе.

Вместо увеличения производительности я обнаружил, что мой клиент не получает ответы, когда я добавляю параллельную обработку. Спящий поток предназначен для имитации потенциального времени, которое потребуется для записи входящих данных в базу данных. Я делаю что-то явно не так?

1 Ответ

1 голос
/ 28 апреля 2019

Я обошел стороной очевидное отсутствие поддержки Netty для параллельного выполнения окончательной обработки UDP с использованием стандартных механизмов параллелизма Java.

public final class EchoServer {
    private EventLoopGroup group = new NioEventLoopGroup();
    private ExecutorService executors = Executors.newFixedThreadPool(10);

    public void start(int port) throws InterruptedException {
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioDatagramChannel.class).handler(new ChannelInitializer<NioDatagramChannel>() {
                @Override
                protected void initChannel(NioDatagramChannel channel) throws Exception {
                    channel.pipeline().addLast(new SimpleChannelInboundHandler<DatagramPacket>() {
                        @Override
                        public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
                            CompletableFuture.runAsync(() -> {
                                System.err.println(packet);
                                try {
                                    Thread.sleep(1000);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("goodbye", StandardCharsets.ISO_8859_1),
                                        packet.sender()));
                            }, executors);
                        }

                        @Override
                        public void channelReadComplete(ChannelHandlerContext ctx) {
                            ctx.flush();
                        }

                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                            cause.printStackTrace();
                        }
                    });
                }
            });

            b.bind(port).sync().channel().closeFuture().await(); 
        } finally {
            group.shutdownGracefully();
        }
    }

    public void stop() {
        group.shutdownGracefully();
    }
}
...