Нетти или сокет отправляют сообщение слишком быстро, что приводит к потере пакетов данных? - PullRequest
0 голосов
/ 12 ноября 2018

Я использую netty для отправки файлов данных (master) другим двум ведомым (все ОС Linux), отправляю файлы следующим образом: Мастер отправляет 8 файлов (размер: 5 ~ 200 м или около того), используя Netty NioServerSocketChannel:

       if (file.exists() && file.isFile()) {
            try {
                int count = 0;
                int cacheLen = 40960;
                if (file.length() % cacheLen == 0) {
                    count = (int) (file.length() / cacheLen);
                } else {
                    count = (int) (file.length() / cacheLen) + 1;
                }

                FileInputStream in = new FileInputStream(file);
                byte[] buffer = new byte[cacheLen];
                int readed;
                int index = 0;
                while ((readed = in.read(buffer)) != -1) {
                    byte[] tmp = new byte[readed];
                    System.arraycopy(buffer,0,tmp,0,readed);
                    FilePack filePack = new FilePack(file.getName(), index, count, tmp);
                    sendMessage(channel, filePack);

                    index++;
                }
                log.info("master to:{},file:{},total :{},sended: {}" ,channel.remoteAddress().toString(),file.getName(),count,index);


            } catch (Throwable e) {
                log.error("send file error", e);
                sendErrInfo(channel,file);

            }
        }

способ отправки такой:

public SyncGetFuture send(Channel channel, Object message, Long sessionId) throws NetException {

        if(!channel.isActive()) {
            this.getChannelHub().removeChannelIfDisconnected(channel);
            throw new NetException("Failed to send message " + message + " to " + channel.remoteAddress() + ", cause: channel is not active.");
        } else if(!channel.isWritable()) {
            throw new NetException("Failed to send message " + message + " to " + channel.remoteAddress() + ", cause: channel is not writable.");
        } else {
            //new an packet
            Packet packet = PacketCreator.create(channel.getVersion(), PacketType.BUSINESS, sessionId, message);
            try {
                nettyChannel.writeAndFlush(packet);
                return sessionId != null?this.getSyncFutureManager().newFuture(sessionId):null;
            } catch (Throwable var7) {
                throw new NetException("Failed to send message " + message + " to " + channel.remoteAddress() + ", cause: " + var7.getMessage(), var7);
            }
        }
    }

netty сервер:

this.bossGroup = new NioEventLoopGroup(this.configuration.getBossThreads());
            this.workerGroup = new NioEventLoopGroup(this.configuration.getWorkerThreads());
            final NettyServerHandler4 nettyServerHandler = new NettyServerHandler4(this.configuration, this, (ChannelHub4)this.getChannelHub());
            ServerBootstrap b = new ServerBootstrap();
            ((ServerBootstrap)((ServerBootstrap)b.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class)).handler(new LoggingHandler(LogLevel.INFO))).childHandler(new ChannelInitializer() {
                public void initChannel(SocketChannel ch) throws IOException {
                    SocketConfig config = SocketServer4.this.configuration.getSocketConfig();
                    if(config.getWriteBufferHighWaterMark() != null) {
                        ch.config().setOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(config.getWriteBufferHighWaterMark().intValue()));
                    }

                    if(config.getWriteBufferLowWaterMark() != null) {
                        ch.config().setOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, Integer.valueOf(config.getWriteBufferLowWaterMark().intValue()));
                    }

                    ch.pipeline().addLast("messageDecoder", new PacketDecoder(1048576));
                    ch.pipeline().addLast("messageEncoder", new PacketEncoder());
                    ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, SocketServer4.this.getConfiguration().getAllIdleTimeSeconds() * 2));
                    ch.pipeline().addLast("nettyServerHandler", nettyServerHandler);
                }
            });

Netty клиент:

final NettyClientHandler4 stop = new NettyClientHandler4(this);
            Bootstrap i$ = new Bootstrap();
            ((Bootstrap)((Bootstrap)((Bootstrap)i$.group(this.group)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(true))).handler(new ChannelInitializer() {
                public void initChannel(SocketChannel ch) throws Exception {
                    SocketConfig config = SocketClient4.this.getConfiguration().getSocketConfig();
                    if(config.getWriteBufferHighWaterMark() != null) {
                        ch.config().setOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(config.getWriteBufferHighWaterMark().intValue()));
                    }

                    if(config.getWriteBufferLowWaterMark() != null) {
                        ch.config().setOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, Integer.valueOf(config.getWriteBufferLowWaterMark().intValue()));
                    }

                    ch.pipeline().addLast("messageDecoder", new PacketDecoder(2147483647));
                    ch.pipeline().addLast("messageEncoder", new PacketEncoder());
                    ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, SocketClient4.this.getConfiguration().getAllIdleTimeSeconds()));
                    ch.pipeline().addLast("heartBeatHandler", new HeartBeatReqHandler());
                    ch.pipeline().addLast("nettyClientHandler", stop);
                }
            });

Последние два файла или три файла всегда теряют данные на ведомом устройстве, когда мастер отправляет все файлы одновременно; но они могут быть успешно загружены, когда ведомые пытаются получить эти файлы специально в следующий раз;

Данные журнала показывают потерю, file7 и file8 неожиданно получают последний пакет:

16:**:**.*** logback [nioEventLoopGroup-2-1] INFO  c.j.c.f.c.l.c.PacketReceiveListener - get file packets,fileName:file7,total:6643,cout:0
.......(downloading 1,2,3,4...5487 correctly)
16:31:18.080 logback [nioEventLoopGroup-2-1] INFO  c.j.c.f.c.l.c.PacketReceiveListener - get file packets,fileName:file7,total:6643,cout:5488
16:31:18.097 logback [nioEventLoopGroup-2-1] INFO  c.j.c.f.c.l.c.PacketReceiveListener - get file packets,fileName:file7,total:6643,cout:6642
16:31:18.100 logback [nioEventLoopGroup-2-1] INFO  c.j.c.f.c.l.c.PacketReceiveListener - get file packets,fileName:file8,total:98,cout:97

независимо от того, ведущий или ведомый, журналы не генерировали никаких исключений;

когда я и сплю, пока в файл отправляю, все нормально работает:

          while ((readed = in.read(buffer)) != -1) {
                    byte[] tmp = new byte[readed];
                    System.arraycopy(buffer,0,tmp,0,readed);
                    FilePack filePack = new FilePack(file.getName(), index, count, tmp);
                    sendMessage(channel, filePack);
                    Thread.sleep(10);
                    index++;
                }
...