Я использую библиотеку AsyncHttpClient для асинхронных неблокирующих запросов.
Мой случай: записать данные в файл по мере его поступления по сети.
Для загрузки файла с удаленного хоста и сохранения в файл я использовал значения по умолчанию ResponseBodyPartFactory.EAGER
и AsynchronousFileChannel
, чтобы не блокировать поток нетто при поступлении данных. Но, как показали мои измерения, по сравнению с LAZY
потребление памяти в куче Java увеличивается во много раз.
Поэтому я решил сразу перейти к LAZY
, но не учел последствия для файлов.
Этот код поможет воспроизвести проблему .:
public static class AsyncChannelWriter {
private final CompletableFuture<Integer> startPosition;
private final AsynchronousFileChannel channel;
public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException {
this.channel = channel;
this.startPosition = CompletableFuture.completedFuture((int) channel.size());
}
public CompletableFuture<Integer> getStartPosition() {
return startPosition;
}
public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) {
return currentPosition.thenCompose(position -> {
CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
writenBytes.complete(result);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
writenBytes.completeExceptionally(exc);
}
});
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
});
}
public void close(CompletableFuture<Integer> currentPosition) {
currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
}
}
public static void main(String[] args) throws IOException {
final String filepath = "/media/veracrypt4/files/1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
asyncChannelWriter.close(atomicReferencePosition.get());
return response;
}
});
}
в этом случае изображение не работает. Но если я использую FileChannel
вместо AsynchronousFileChannel
, в обоих случаях файлы получаются нормальными. Могут ли быть нюансы при работе с DirectByteBuffer
(в случае с LazyResponseBodyPart.getBodyByteBuffer()
) и AsynchronousFileChannel
?
Что может быть не так с моим кодом, если все нормально работает с EAGER
?
UPDATE
Как я заметил, если я использую LAZY
, и, например, я добавляю строку
Thread.sleep (10)
в методе onBodyPartReceived
, вот так:
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
Thread.sleep(10);
return State.CONTINUE;
}
Файл сохраняется на диске в нерабочем состоянии.
Насколько я понимаю, причина в том, что в течение этих 10 миллисекунд асинхронному потоку в AsynchronousFileChannel
удается записать данные на диск с этого DirectByteBuffer
.
Оказывается, файл поврежден из-за того, что этот асинхронный поток использует этот буфер для записи вместе с потоком netty.
Если мы посмотрим на исходный код с EagerResponseBodyPart
, то увидим следующее
private final byte[] bytes;
public EagerResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
bytes = byteBuf2Bytes(buf);
}
@Override
public ByteBuffer getBodyByteBuffer() {
return ByteBuffer.wrap(bytes);
}
Таким образом, когда часть данных поступает, она немедленно сохраняется в байтовом массиве. Затем мы можем безопасно обернуть их в HeapByteBuffer и передать в асинхронный поток в файловом канале.
Но если вы посмотрите на код LazyResponseBodyPart
private final ByteBuf buf;
public LazyResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
this.buf = buf;
}
@Override
public ByteBuffer getBodyByteBuffer() {
return buf.nioBuffer();
}
Как вы можете видеть, мы фактически используем в асинхронном файловом канале поток netty ByteBuff
(в данном случае всегда PooledSlicedByteBuf
) через вызов метода nioBuffer
Что я могу сделать в этой ситуации, как безопасно передать DirectByteBuffer
в асинхронном потоке без копирования буфера в кучу Java?