Netty клиент не получает полные данные, отправленные сервером - PullRequest
0 голосов
/ 22 ноября 2018

Я разрабатываю решение на основе Netty для передачи файла с сервера на клиент по протоколу TCP.Клиент указывает местоположение файла, а затем сервер отправляет файл клиенту.

В настоящее время решение отлично работает для файлов небольшого размера (<2 МБ данных). </p>

Еслиразмер отправляемого файла превышает ~ 5 МБ , отправляются только частичные данные , и это варьируется (каждый раз при отправке одного и того же объема данных).Также из журнала видно, что Сервер отправил полный объем данных (файл).

Проблема в том, что клиент не получает полные данные , отправленные сервером.Что не так в моем коде ниже?или Может кто-то указать мне правильное направление.

Ниже мой клиент, сервер и их обработчики: (Для краткость я перечислил только методы, которые важны)

Клиент:

 public class FileClient {

        private final static int PORT = 8992;
        private final static String HOST = "127.0.0.1";

        public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

            private SslContext sslContext = null;
            private String srcFile = "";
            private String destFile = "";

            public ClientChannelInitializer(String srcFile, String destFile, SslContext sslCtx) {
                this.sslContext = sslCtx;
                this.srcFile = srcFile;
                this.destFile = destFile;
            }

            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast(sslContext.newHandler(socketChannel.alloc(), HOST, PORT));
                pipeline.addLast("clientHandler", new FileClientHandler(srcFile, destFile));
            }

        }

        private void startUp(String srcFile, String destFile) throws Exception {
            SslContext sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            EventLoopGroup workerGroup = new NioEventLoopGroup();

                Bootstrap clientBootstrap = new Bootstrap();
                clientBootstrap.group(workerGroup);
                clientBootstrap.channel(NioSocketChannel.class);
                clientBootstrap.option(ChannelOption.TCP_NODELAY, true);
                clientBootstrap.handler(new LoggingHandler(LogLevel.INFO));
                clientBootstrap.handler(new ClientChannelInitializer(srcFile, destFile, sslCtx));

Channel channel = clientBootstrap.connect(new InetSocketAddress(HOST, PORT)).sync().channel();
                channel.closeFuture().sync();
            } 
        }

        public static void main(String[] args) throws Exception {

            String src = "/Users/home/src/test.mp4";
            String dest = "/Users/home/dest/test.mp4";
            new FileClient().startUp(src, dest);
        }

    }

ClientHandler:

public class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {


    private final String sourceFileName;
    private OutputStream outputStream;
    private Path destFilePath;
    private byte[] buffer = new byte[0];



    public FileClientHandler(String SrcFileName, String destFileName) {
        this.sourceFileName = SrcFileName;
        this.destFilePath = Paths.get(destFileName);
        System.out.println("DestFilePath-" + destFilePath);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(ToByteBuff(this.sourceFileName));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuff) throws Exception {
        if (this.outputStream == null) {
            Files.createDirectories(this.destFilePath.getParent());
            if (Files.exists(this.destFilePath)) {
                Files.delete(this.destFilePath);
            }
            this.outputStream = Files.newOutputStream(this.destFilePath, StandardOpenOption.CREATE,
                    StandardOpenOption.APPEND);
        }

        int size = byteBuff.readableBytes();
        if (size > this.buffer.length) {
            this.buffer = new byte[size];
        }
        byteBuff.readBytes(this.buffer, 0, size);
        this.outputStream.write(this.buffer, 0, size);

    }   

FileServer:

public class FileServer {
    private final int PORT = 8992;

    public void run() throws Exception {
        SelfSignedCertificate ssc = new SelfSignedCertificate();
        final SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(sslCtx.newHandler(ch.alloc()));

                            pipeline.addLast(new ChunkedWriteHandler());
                            pipeline.addLast(new FilServerFileHandler());
                        }
                    });
            ChannelFuture f = b.bind(PORT).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new FileServer().run();
    }
}

FileServerHandler:

public class FilServerFileHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buff) throws Exception {
        String filePathStr = byteBuf.toString(CharsetUtil.UTF_8);

        File file = new File(filePathStr);
        RandomAccessFile raf = null;
        ChannelFuture sendFileFuture;
        try {
            raf = new RandomAccessFile(file, "r");

            sendFileFuture = ctx.writeAndFlush(new ChunkedNioFile(raf.getChannel()),
                    ctx.newProgressivePromise());

            sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
                public void operationComplete(ChannelProgressiveFuture future) throws Exception {
                    System.err.println("Transfer complete.");
                }

                public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
                        throws Exception {
                    if (total < 0) { // total unknown
                        System.err.println("Transfer progress: " + progress);
                    } else {
                        System.err.println("Transfer progress: " + progress + " / " + total);
                    }
                }
            });

        } catch (FileNotFoundException fnfe) {
        } finally {
            if (raf != null)
                raf.close();
        }
    }

Я проверил SO Q1 и SO Q2

1 Ответ

0 голосов
/ 28 ноября 2018

Исправлена ​​проблема с небольшим изменением в FilServerFileHandler:

public class FileServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buff) throws Exception {
        String filePathStr = buff.toString(CharsetUtil.UTF_8);

        File file = new File(filePathStr);
        RandomAccessFile raf = new RandomAccessFile(file, "r");
        ChannelFuture sendFileFuture;
        try {
            sendFileFuture = ctx.writeAndFlush(new ChunkedNioFile(raf.getChannel()), ctx.newProgressivePromise());
            sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
                public void operationComplete(ChannelProgressiveFuture future) throws Exception {
                    System.err.println("Transfer complete.");
                    if (raf != null) {
                        raf.close();
                    }
                }
                public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
                        throws Exception {
                    if (total < 0) { // total unknown
                        System.err.println("Transfer progress: " + progress);
                    } else {
                        System.err.println("Transfer progress: " + progress + " / " + total);
                    }
                }
            });
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }
}

Я переместил raf.close() в operationComplete метод.

Частичная транспортировка вызвана закрытиемraf во время операции записи.Обратите внимание, что ctx.writeAndFlush является асинхронным вызовом, поэтому блок raf.close() in finally может быть запущен до завершения операции записи, особенно когда размер файла достаточно велик.

...