Java AsyncHttpClient: поврежденный файл при записи из LazyResponseBodyPart в AsynchronousFileChannel - PullRequest
6 голосов
/ 31 мая 2019

Я использую библиотеку AsyncHttpClient для асинхронных неблокирующих запросов. Мой случай: записать данные в файл по мере его поступления по сети.

Для загрузки файла с удаленного хоста и сохранения в файл я использовал значения по умолчанию ResponseBodyPartFactory.EAGER и AsynchronousFileChannel, чтобы не блокировать поток нетто при поступлении данных. Но, как показали мои измерения, по сравнению с LAZY потребление памяти в куче Java увеличивается во много раз.

Поэтому я решил сразу перейти к LAZY, но не учел последствия для файлов.

Этот код поможет воспроизвести проблему .:

public static class AsyncChannelWriter {
     private final CompletableFuture<Integer> startPosition;
     private final AsynchronousFileChannel channel;

     public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException {
         this.channel = channel;
         this.startPosition = CompletableFuture.completedFuture((int) channel.size());
     }

     public CompletableFuture<Integer> getStartPosition() {
         return startPosition;
     }

     public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) {

         return currentPosition.thenCompose(position -> {
             CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
             channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() {
                 @Override
                 public void completed(Integer result, ByteBuffer attachment) {
                     writenBytes.complete(result);
                 }

                 @Override
                 public void failed(Throwable exc, ByteBuffer attachment) {
                     writenBytes.completeExceptionally(exc);
                 }
             });
             return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
         });
     }

     public void close(CompletableFuture<Integer> currentPosition) {
         currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
     }
 }

 public static void main(String[] args) throws IOException {
     final String filepath = "/media/veracrypt4/files/1.jpg";
     final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";

     final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
             .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
     final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
     final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
     final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
     client.prepareGet(downloadUrl)
             .execute(new AsyncCompletionHandler<Response>() {

                 @Override
                 public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
                     final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
                     final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
                     final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
                     atomicReferencePosition.set(newPosition);
                     return State.CONTINUE;
                 }

                 @Override
                 public Response onCompleted(Response response) {
                     asyncChannelWriter.close(atomicReferencePosition.get());
                     return response;
                 }
             });
}

в этом случае изображение не работает. Но если я использую FileChannel вместо AsynchronousFileChannel, в обоих случаях файлы получаются нормальными. Могут ли быть нюансы при работе с DirectByteBuffer (в случае с LazyResponseBodyPart.getBodyByteBuffer()) и AsynchronousFileChannel?

Что может быть не так с моим кодом, если все нормально работает с EAGER?


UPDATE

Как я заметил, если я использую LAZY, и, например, я добавляю строку Thread.sleep (10) в методе onBodyPartReceived, вот так:

 @Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
    final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
    final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
    final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
    atomicReferencePosition.set(newPosition);
    Thread.sleep(10);
    return State.CONTINUE;
}

Файл сохраняется на диске в нерабочем состоянии.

Насколько я понимаю, причина в том, что в течение этих 10 миллисекунд асинхронному потоку в AsynchronousFileChannel удается записать данные на диск с этого DirectByteBuffer.

Оказывается, файл поврежден из-за того, что этот асинхронный поток использует этот буфер для записи вместе с потоком netty.

Если мы посмотрим на исходный код с EagerResponseBodyPart, то увидим следующее

private final byte[] bytes;
  public EagerResponseBodyPart(ByteBuf buf, boolean last) {
    super(last);
    bytes = byteBuf2Bytes(buf);
  }

  @Override
  public ByteBuffer getBodyByteBuffer() {
    return ByteBuffer.wrap(bytes);
  }

Таким образом, когда часть данных поступает, она немедленно сохраняется в байтовом массиве. Затем мы можем безопасно обернуть их в HeapByteBuffer и передать в асинхронный поток в файловом канале.

Но если вы посмотрите на код LazyResponseBodyPart

  private final ByteBuf buf;

  public LazyResponseBodyPart(ByteBuf buf, boolean last) {
    super(last);
    this.buf = buf;
  }
  @Override
  public ByteBuffer getBodyByteBuffer() {
    return buf.nioBuffer();
  }

Как вы можете видеть, мы фактически используем в асинхронном файловом канале поток netty ByteBuff (в данном случае всегда PooledSlicedByteBuf) через вызов метода nioBuffer

Что я могу сделать в этой ситуации, как безопасно передать DirectByteBuffer в асинхронном потоке без копирования буфера в кучу Java?

1 Ответ

0 голосов
/ 13 июня 2019

Я говорил с сопровождающим AsyncHttpClient. Здесь можно посмотреть

Основная проблема заключалась в том, что я не использовал netty ByteBuf методы retain и release. В итоге я пришел к двум решениям.

Первый: записать байты в последовательности в файл с позицией отслеживания с помощью CompletableFuture.

Определить класс оболочки для AsynchronousFileChannel

@Log4j2
public class AsyncChannelNettyByteBufWriter implements Closeable {
    private final AtomicReference<CompletableFuture<Long>> positionReference;
    private final AsynchronousFileChannel channel;

    public AsyncChannelNettyByteBufWriter(AsynchronousFileChannel channel) {
        this.channel = channel;
        try {
            this.positionReference = new AtomicReference<>(CompletableFuture.completedFuture(channel.size()));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public CompletableFuture<Long> write(ByteBuf byteBuffer) {
        final ByteBuf byteBuf = byteBuffer.retain();
        return positionReference.updateAndGet(x -> x.thenCompose(position -> {
            final CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
            channel.write(byteBuf.nioBuffer(), position, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
                @Override
                public void completed(Integer result, ByteBuf attachment) {
                    attachment.release();
                    writenBytes.complete(result);
                }

                @Override
                public void failed(Throwable exc, ByteBuf attachment) {
                    attachment.release();
                    log.error(exc);
                    writenBytes.completeExceptionally(exc);
                }
            });
            return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
        }));
    }

    public void close() {
        positionReference.updateAndGet(x -> x.whenComplete((position, throwable) -> {
            try {
                channel.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }));
    }
}

На самом деле, вероятно, здесь не будет AtomicReference, если запись происходит в одном потоке, а если из нескольких, то нам нужно серьезно подойти к синхронизации.

И основное использование.

public static void main(String[] args) throws IOException {
    final String filepath = "1.jpg";
    final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
    final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
            .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
    final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
    final AsyncChannelNettyByteBufWriter asyncChannelNettyByteBufWriter = new AsyncChannelNettyByteBufWriter(channel);

    client.prepareGet(downloadUrl)
            .execute(new AsyncCompletionHandler<Response>() {
                @Override
                public State onBodyPartReceived(HttpResponseBodyPart content) {
                    final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf();
                    asyncChannelNettyByteBufWriter.write(byteBuf);
                    return State.CONTINUE;
                }

                @Override
                public Response onCompleted(Response response) {
                    asyncChannelNettyByteBufWriter.close();
                    return response;
                }
            });
}

Второе решение: отслеживать позицию на основе полученного размера байтов.

public static void main(String[] args) throws IOException {
    final String filepath = "1.jpg";
    final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
    final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
            .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
    final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), new HashSet<>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE)), executorService);

    client.prepareGet(downloadUrl)
            .execute(new AsyncCompletionHandler<Response>() {

                private long position = 0;
                @Override
                public State onBodyPartReceived(HttpResponseBodyPart content) {
                    final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf().retain();
                    long currentPosition = position;
                    position+=byteBuf.readableBytes();
                    channel.write(byteBuf.nioBuffer(), currentPosition, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
                        @Override
                        public void completed(Integer result, ByteBuf attachment) {
                            attachment.release();
                            if(content.isLast()){
                                try {
                                    channel.close();
                                } catch (IOException e) {
                                    throw new UncheckedIOException(e);
                                }
                            }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuf attachment) {
                            attachment.release();
                            try {
                                channel.close();
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }
                    });
                    return State.CONTINUE;
                }
                @Override
                public Response onCompleted(Response response) {
                    return response;
                }
            });
}

Во втором решении, поскольку мы не ждем, пока в файл будут записаны некоторые байты, AsynchronousFileChannel может создать много потоков (если вы используете Linux, потому что Linux не реализует неблокирующий асинхронный ввод-вывод файла). В Windows ситуация намного лучше).

Как показали мои измерения, в случае записи на медленную USB-флешку число потоков может достигать десятков тысяч, поэтому для этого вам нужно ограничить количество потоков, создав ExecutorService и передав его в AsynchronousFileChannel.

Есть ли очевидные преимущества и недостатки первого и второго решений? Мне сложно сказать. Может кто-нибудь подскажет, что эффективнее.

...