Netty передача файлов вызывает исключение - PullRequest
0 голосов
/ 18 мая 2018

Я пытаюсь передать файлы через мою реализацию netty.Мой пользовательский декодер и кодировщик оба знают два типа объектов: String и FileChunk, который на самом деле содержит индекс чанка и его содержимое как byte[].Передача работает следующим образом:

  1. Client0 отправляет Client1 объект JsonObject как String, содержащий путь, в котором должен быть сохранен файл, его размер, сколько фрагментов будет отправлено и т. Д.
  2. Client0 отправляет первый фрагмент как FileChunk и блокирует поток для ожидания ответа, что Client1 успешно получил свой пакет.
  3. Client1 получил фрагмент файла и записывает его на диск.После этого он отправляет успешный пакет на Client0
  4. Client0 получает успешный пакет и отправляет следующий фрагмент.

Этот процесс должен продолжаться до тех пор, пока файл не будет передан.И это работает. Если я добавлю задержку в 1 секунду перед отправкой фрагмента файла размером 64 КБ! Derp.

Кажется, что ошибки нет - но она не работает при большой нагрузке и без блокировки потоков.Нужно ли где-нибудь очищать буферы или мне нужна копия?Пожалуйста, помогите ... Если вы заинтересовались примером проекта с IntelliJ и Maven, дайте мне знать в комментариях, я подготовлю его.

Достаточно объяснено.Вот код!

FileTransfer Runnable

public class FileTransfer implements Runnable {

    private FileSlicer slicer;
    private Client client;

    public FileTransfer(FileSlicer slicer, Client client) {
        this.slicer = slicer;
        this.client = client;
    }

    public void run() {

        synchronized(this) {

            while(slicer.hasNext()) {

                try {
                    client.getContext().writeAndFlush(slicer.getNextSlice());
                    this.wait(); //Unblocked when success packet received, works
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }
}

Channel-Initializer (размер буфера установлен здесь, по умолчанию не должен быть переполнен)

@Override
protected void initChannel(Channel channel) throws Exception {

    channel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(1024 * 65));

    ChannelPipeline pipeline = channel.pipeline();
    pipeline.addLast(new PacketDecoder());
    pipeline.addLast(new PacketEncoder());
    pipeline.addLast(new ChannelEncoder());
    pipeline.addLast(new ServerHandler());

}

Декодер (определяет, является ли он String или FileChunk и анализирует его):

public class PacketDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> output) throws Exception {

        int type = buf.readInt();
        if (buf.readableBytes() <= 0) return;
        byte[] buffer;

        switch (type) {

            case 0:
                buffer = buf.readBytes(buf.readInt()).array();
                output.add(new String(buffer));
                break;

            case 1:
                int read = buf.readInt();
                buffer = buf.readBytes(buf.readInt()).array();
                output.add(new FileChunk(buffer, read));
                break;

            default:
                System.out.println("Unknown Decodec.");
                break;

        }

    }

}

Stacktrace :

io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(12) + length(65536) exceeds writerIndex(40960): PooledUnsafeDirectByteBuf(ridx: 12, widx: 40960, cap: 66560)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:347)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
    at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:331)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
    at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
    at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
    ...

1 Ответ

0 голосов
/ 18 мая 2018

Функция декодирования предполагает, что весь файл доступен, однако буфер может содержать только часть данных, в зависимости от того, сколько потока было получено.

Одним из способов решения этой проблемы является добавление кадрадекодер, такой как LengthFieldBasedFrameDecoder , который сегментирует поток в соответствии с полем длины в сообщении, например, в вашем примере.

Другой вариант заключается в использовании того же подхода, что и в примере файлового сервера.в документах: https://netty.io/4.1/xref/io/netty/example/http/file/package-summary.html

...