Нужен только поток для ответа при использовании HTTP-клиента Apache Async - PullRequest
0 голосов
/ 02 мая 2018

Я использую Apache Async Http Client для загрузки огромных файлов из хранилища Azure.

Это пример кода, который я использую ->

 CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault();
  httpclient.execute(request, new FutureCallback<org.apache.http.HttpResponse>() {
     @Override
     public void completed(final org.apache.http.HttpResponse httpResponse) {
     }

     @Override
     public void failed(final Exception e) {
        future.completeExceptionally(e);
     }

     @Override
     public void cancelled() {
        future.completeExceptionally(new Exception("Request cancelled"));
     }
  });

Но это сохранение файла в локальном буфере перед вызовом завершенного обратного вызова.

Я пытался использовать AsyncByteConsumer ->

AsyncByteConsumer<org.apache.http.HttpResponse>
consumer = new AsyncByteConsumer<org.apache.http.HttpResponse>() {
     @Override
     protected void onByteReceived(ByteBuffer buf, IOControl ioctrl) throws IOException {

     }

     @Override
     protected void onResponseReceived(org.apache.http.HttpResponse response) throws HttpException, IOException {
     }

     @Override
     protected org.apache.http.HttpResponse buildResult(HttpContext context) throws Exception {
        return null;
     }
  };

Это также не сработало для меня. Я получаю следующую ошибку -> java.lang.IllegalStateException: содержимое не было предоставлено

Все, что я хочу, - это получить поток ответа, который я передам своим клиентам, чтобы они могли загрузить файл напрямую, используя поток.

РЕДАКТИРОВАТЬ 1 ->

Поэтому я расширил AbstractAsyncResponseConsumer, чтобы написать своего собственного потребителя ->

public abstract  class MyConsumer extends AbstractAsyncResponseConsumer<HttpResponse> {

private volatile HttpResponse response;
private volatile SimpleInputBuffer buf;

public MyConsumer() {
    super();
}

@Override
protected void onResponseReceived(HttpResponse response) {
    this.response = response;
}

@Override
protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException {
    Asserts.notNull(this.buf, "Content buffer");
    System.out.println("onContentReceived");
    buf.consumeContent(decoder);
}

protected abstract void onEntitySet(HttpResponse httpResponse);
@Override
protected void onEntityEnclosed(HttpEntity entity, ContentType contentType) throws IOException {
    System.out.println("onEntityEnclosed");
    long len = entity.getContentLength();
    if (len > Integer.MAX_VALUE) {
        throw new ContentTooLongException("Entity content is too long: " + len);
    }
    if (len < 0) {
        len = 4096;
    }
    this.buf = new SimpleInputBuffer((int) len, new HeapByteBufferAllocator());
    this.response.setEntity(new ContentBufferEntity(entity, this.buf));
    onEntitySet(this.response);


}

@Override
protected HttpResponse buildResult(HttpContext context) throws Exception {
    System.out.println("buildResult");
    return response;

}

@Override
protected void releaseResources() {
    } 
 }

Вот код, который я использую для выполнения http-запроса

CompletableFuture<HttpResponse> makeRequest() {
            HttpAsyncRequestProducer producer3 = HttpAsyncMethods.create(request);
            CompletableFuture<HttpResponse> future = new CompletableFuture<>();
            httpclient.execute(producer3, new MyConsumer() {
                                   @Override
                                   protected void onEntitySet(HttpResponse httpResponse) {
                                       future.complete(httpResponse);
                                   }
                               },
                               new FutureCallback<HttpResponse>() {
                @Override
                    public void completed(HttpResponse result) {
                        System.out.println("Completed" + result);
                    }

                    @Override
                    public void failed(Exception ex) {

                    }

                    @Override
                    public void cancelled() {

                    }
                });
}


     makeRequest().thenAccept((HttpResponse httpResponse) -> {
         try {
             System.out.println(IOUtils.toString(httpResponse.getEntity().getContent()));
         } catch (IOException e) {
             e.printStackTrace();
         }
     });

Я получаю этот вывод ->

onEntityEnclosed.

java.io.IOException: базовый поток ввода возвратил ноль байтов.

Я завершаю ответ в будущем, как только получаю обратный вызов onResponseReceived, который возвращает статус и заголовки ответа.

Я считаю, что должно произойти, что обратный вызов onContentReceived будет вызываться в отдельном потоке, который будет записывать данные буфера в поток, и мой поток вызывающих может читать их в отдельном потоке.

1 Ответ

0 голосов
/ 03 мая 2018

Я не уверен, почему у вас возникают такие проблемы, но у меня работает следующий фрагмент кода (с помощью HttpAsyncClient 4.1.3)

CloseableHttpAsyncClient httpClient = HttpAsyncClients.createDefault();
httpClient.start();
final Future<Void> future = httpClient.execute(
        HttpAsyncMethods.createGet("http://httpbin.org/"),
        new AsyncByteConsumer<Void>() {

            @Override
            protected void onByteReceived(final ByteBuffer buf, final IOControl ioctrl) throws IOException {
                System.out.println(buf.remaining());
            }

            @Override
            protected void onResponseReceived(final HttpResponse response) throws HttpException, IOException {
                System.out.println(response.getStatusLine());
            }

            @Override
            protected Void buildResult(final HttpContext context) throws Exception {
                return null;
            }

        },
        null);
future.get();
httpClient.close();

консоль>

HTTP/1.1 200 OK
1060
8192
3877
...