OkHttpAsyncHttpClient генерирует IOException: закрытое скачивание большого двоичного объекта из контейнера Azure - PullRequest
0 голосов
/ 12 февраля 2020

Я не могу загрузить BLOB-объект из azure, используя azure -sdk-for- java. OkHttpAsyncHttpClient генерирует IOException: закрыто, когда он читает InputStream. Я не знаю, откуда придет конец. Версия azure -storage-blob - 12.1.0, а azure -core-http-okhttp - 1.1.0, в которой используется com.squareup.okhttp3.okhttp версии 4.2.2

BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
                .connectionString(getConnectionString())
                .httpClient(new OkHttpAsyncHttpClientBuilder().build())
                .buildClient();

BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(getContainerName());
for (BlobItem blobItem : containerClient.listBlobs()) {
    String fileName = blobItem.getName();
    BlobClient blobClient2 = containerClient.getBlobClient(fileName);      
    blobClient2.downloadToFile(fileName);  
    blobClient2.delete();
}

Исключение:

2020-02-11 15:54:00,928 | INFO  | -pubblobsdk_Worker-2 | o.q.c.JobRunShell                | 715 - org.quartz-scheduler.quartz - 2.3.0 | Job myGroup.us_gov_dod_af_cce_mm_pubblobsdk_route2 threw a JobExecutionException: 
org.quartz.JobExecutionException: reactor.core.Exceptions$ReactiveException: java.io.IOException: closed
    at org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:61) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.quartz.core.JobRunShell.run(JobRunShell.java:202) [!/:?]
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [!/:?]
Caused by: reactor.core.Exceptions$ReactiveException: java.io.IOException: closed
    at reactor.core.Exceptions.propagate(Exceptions.java:336) ~[!/:3.3.0.RELEASE]
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:91) ~[!/:3.3.0.RELEASE]
    at reactor.core.publisher.Mono.block(Mono.java:1663) ~[!/:3.3.0.RELEASE]
    at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:94) ~[!/:?]
    at com.azure.storage.blob.specialized.BlobClientBase.downloadToFileWithResponse(BlobClientBase.java:481) ~[!/:?]
    at com.azure.storage.blob.specialized.BlobClientBase.downloadToFile(BlobClientBase.java:442) ~[!/:?]
    at mil.af.cce2.mm.templates.azure.lib.AzureLibBean.downloadBlobFromContainer(AzureLibBean.java:480) ~[!/:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
    at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:187) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.processor.loadbalancer.QueueLoadBalancer.process(QueueLoadBalancer.java:44) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.processor.loadbalancer.LoadBalancerSupport.process(LoadBalancerSupport.java:97) ~[!/:2.21.0.fuse-750033-redhat-00001]
    at org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:58) ~[!/:2.21.0.fuse-750033-redhat-00001]
    ... 2 more
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93) ~[!/:3.3.0.RELEASE]
        at reactor.core.publisher.Mono.block(Mono.java:1663) ~[!/:3.3.0.RELEASE]
        at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:94) ~[!/:?]
        at com.azure.storage.blob.specialized.BlobClientBase.downloadToFileWithResponse(BlobClientBase.java:481) ~[!/:?]
        at com.azure.storage.blob.specialized.BlobClientBase.downloadToFile(BlobClientBase.java:442) ~[!/:?]
        at mil.af.cce2.mm.templates.azure.lib.AzureLibBean.downloadBlobFromContainer(AzureLibBean.java:480) ~[!/:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
        at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:187) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.processor.loadbalancer.QueueLoadBalancer.process(QueueLoadBalancer.java:44) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.processor.loadbalancer.LoadBalancerSupport.process(LoadBalancerSupport.java:97) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:58) ~[!/:2.21.0.fuse-750033-redhat-00001]
        at org.quartz.core.JobRunShell.run(JobRunShell.java:202) [!/:?]
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [!/:?]
Caused by: java.io.IOException: closed
    at okio.RealBufferedSource$inputStream$1.read(RealBufferedSource.kt:434) ~[?:?]
    at java.io.InputStream.read(InputStream.java:101) ~[?:1.8.0_212]
    at com.azure.core.http.okhttp.OkHttpAsyncHttpClient$OkHttpResponse.lambda$toFluxByteBuffer$6(OkHttpAsyncHttpClient.java:293) ~[?:?]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[?:?]
    at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:79) ~[?:?]
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:99) ~[?:?]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:1920) ~[?:?]
    at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155) ~[?:?]
    at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.request(FluxTakeUntil.java:133) ~[?:?]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:179) ~[?:?]
    at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155) ~[?:?]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:105) ~[?:?]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:1920) ~[?:?]
    at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[?:?]
    at com.azure.core.util.FluxUtil$1$1.completed(FluxUtil.java:257) ~[?:?]
    at com.azure.core.util.FluxUtil$1$1.completed(FluxUtil.java:248) ~[?:?]
    at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) ~[?:1.8.0_212]
    at sun.nio.ch.SimpleAsynchronousFileChannelImpl$3.run(SimpleAsynchronousFileChannelImpl.java:389) ~[?:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_212]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_212]

1 Ответ

0 голосов
/ 12 февраля 2020

Кажется, это ошибка.

При проверке исходного кода, если вы не добавите azure-core-http-okhttp зависимость maven, он будет использовать defaultProvider для создания экземпляра HttpClient.

public final class HttpClientProviders {
    private static HttpClientProvider defaultProvider;
    private static final String CANNOT_FIND_HTTP_CLIENT =
        "Cannot find any HttpClient provider on the classpath - unable to create a default HttpClient instance";

    static {
        ServiceLoader<HttpClientProvider> serviceLoader = ServiceLoader.load(HttpClientProvider.class);
        // Use the first provider found in the service loader iterator.
        Iterator<HttpClientProvider> it = serviceLoader.iterator();
        if (it.hasNext()) {
            defaultProvider = it.next();
        }
    }

    private HttpClientProviders() {
        // no-op
    }

    public static HttpClient createInstance() {
        if (defaultProvider == null) {
            throw new IllegalStateException(CANNOT_FIND_HTTP_CLIENT);
        }

        return defaultProvider.createInstance();
    }
}

И, по умолчанию, azure-core-http-netty используется. В нем есть ReactorNettyClientProvider:

public class ReactorNettyClientProvider implements HttpClientProvider {

    @Override
    public HttpClient createInstance() {
        return new NettyAsyncHttpClientBuilder().build();
    }
}

Итак, если вы не укажете httpclient, BlobServiceClient будет использовать NettyAsyncHttpClient. И с этим все в порядке.

Но, если azure-core-http-okhttp добавляется вручную в зависимости, и вы не указываете httpclient в BlobServiceClientBuilder. Тогда будет использоваться OkHttpClientProvider.

public class OkHttpClientProvider implements HttpClientProvider {

    @Override
    public HttpClient createInstance() {
        return new OkHttpAsyncHttpClientBuilder().build();
    }
}

Забавная вещь случается тогда, вы получите ту же ошибку, что и отправили. Итак, я думаю, что-то не так с реализацией OkHttpAsyncHttpClient.


Решение

  1. Изменить на использование azure -core-http-netty

  2. Изменить на использование Microsoft Azure SDK клиента хранилища

Образец:

    public static void main(String[] args) throws Exception {
        String connectionString = "DefaultEndpointsProtocol=https;AccountName=storagetest789;AccountKey=G36mc*******j1w==;EndpointSuffix=core.windows.net";
        StorageCredentials credentials = StorageCredentials.tryParseCredentials(connectionString);

        // Set proxy
        //OperationContext.setDefaultProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1",8888)));

        CloudStorageAccount storageAccount = new CloudStorageAccount(credentials, true);
        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
        CloudBlobContainer blobContainer = blobClient.getContainerReference("function");
        blobContainer.createIfNotExists();

//        for (ListBlobItem blobItem : blobContainer.listBlobs()) {
//            System.out.println(blobItem.getUri());
//            CloudBlockBlob blockBlob = new CloudBlockBlob(blobItem.getUri(), credentials);
//            blockBlob.downloadToFile("d:\\test\\"+blockBlob.getName());
//        }

        for (ListBlobItem blobItem : blobContainer.listBlobs()) {
            String[] parts = blobItem.getUri().toString().split("/");
            CloudBlockBlob blockBlob = blobContainer.getBlockBlobReference(parts[parts.length - 1]);
            blockBlob.downloadToFile("d:/test/" + blockBlob.getName());
        }
    }
...