Как вы читаете и распечатываете ответ HTTP с использованием java.net.http по мере поступления фрагментов? - PullRequest
0 голосов
/ 05 октября 2018

Java 11 представляет новый пакет java.net.http для выполнения HTTP-запросов.Для общего использования это довольно просто.

Мой вопрос: как мне использовать java.net.http для обработки фрагментированных ответов при получении клиентом каждого фрагмента?

java.http.net содержитBodySubscriber

http_get_demo.py

Ниже приведена реализация на python, которая печатает куски по мере их появления.Я хотел бы сделать то же самое с java.net.http:

import argparse
import requests


def main(url: str):
    with requests.get(url, stream=True) as r:
        for c in r.iter_content(chunk_size=1):
            print(c.decode("UTF-8"), end="")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Read from a URL and print as text as chunks arrive")
    parser.add_argument('url', type=str, help="A URL to read from")
    args = parser.parse_args()

    main(args.url)

HttpGetDemo.java

Просто для полноты, вот простой пример создания запроса на блокировку с использованием Java.net.http:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

public class HttpGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();
    var response = client.send(request, bodyHandler);
    System.out.println(response.body());

  }
}

HttpAsyncGetDemo.java

А вот пример выполнения неблокирующего / асинхронного запроса:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

/**
 * ReadChunked
 */
public class HttpAsyncGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();

    client.sendAsync(request, bodyHandler)
            .thenApply(HttpResponse::body)
            .thenAccept(System.out::println)
            .join();

  }
}

Ответы [ 3 ]

0 голосов
/ 12 октября 2018

Код Python не гарантирует, что данные тела ответа будут доступны по одному HTTP-чанку за один раз.Он просто предоставляет небольшое количество данных приложению, тем самым уменьшая объем памяти, потребляемой на уровне приложения (она может быть буферизована ниже в стеке).HTTP-клиент Java 11 поддерживает потоковую передачу через один из обработчиков потокового тела: HttpResponse.BodyHandlers: ofInputStream, ofByteArrayConsumer, asLines и т. Д.

Или напишите свой собственный обработчик / подписчик, как показано ниже: https://www.youtube.com/watch?v=qiaC0QMLz5Y

0 голосов
/ 30 октября 2018

Спасибо @pavel и @ chegar999 за частичные ответы.Они привели меня к моему решению.

Обзор

Решение, которое я нашел, приведено ниже.По сути, решение заключается в использовании пользовательского java.net.http.HttpResponse.BodySubscriber.BodySubscriber содержит реактивные методы (onSubscribe, onNext, onError и onComplete) и метод getBody, который в основном возвращает java CompletableFuture , который в конечном итоге создаст тело HTTP-запроса.Если у вас есть BodySubscriber, вы можете использовать его следующим образом:

HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
    .uri(URI.create(uri))
    .build();

return client.sendAsync(request, responseInfo -> new StringSubscriber())
    .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
    .thenApply(HttpResponse::body);

Обратите внимание на строку:

client.sendAsync(request, responseInfo -> new StringSubscriber())

Здесь мы регистрируем нашего пользовательского BodySubscriber;в этом случае мой пользовательский класс называется StringSubscriber.

CustomSubscriber.java

Это полный рабочий пример.Используя Java 11, вы можете запустить его без компиляции.Просто вставьте его в файл с именем CustomSubscriber.java, затем выполните команду java CustomSubscriber <some url>.Он печатает содержимое каждого куска по мере его поступления.Он также собирает их и возвращает их как тело после завершения ответа.

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import java.util.List;

public class CustomSubscriber {

  public static void main(String[] args) {
    CustomSubscriber cs = new CustomSubscriber();
    String body = cs.get(args[0]).join();
    System.out.println("--- Response body:\n: ..." + body + "...");
  }

  public CompletableFuture<String> get(String uri) {
    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(uri))
        .build();

    return client.sendAsync(request, responseInfo -> new StringSubscriber())
        .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
        .thenApply(HttpResponse::body);
  }

  static class StringSubscriber implements BodySubscriber<String> {

    final CompletableFuture<String> bodyCF = new CompletableFuture<>();
    Flow.Subscription subscription;
    List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();

    @Override
    public CompletionStage<String> getBody() {
      return bodyCF;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
      this.subscription = subscription;
      subscription.request(1); // Request first item
    }

    @Override
    public void onNext(List<ByteBuffer> buffers) {
      System.out.println("-- onNext " + buffers);
      try {
        System.out.println("\tBuffer Content:\n" + asString(buffers));
      } 
      catch (Exception e) {
        System.out.println("\tUnable to print buffer content");
      }
      buffers.forEach(ByteBuffer::rewind); // Rewind after reading
      responseData.addAll(buffers);
      subscription.request(1); // Request next item
    }

    @Override
    public void onError(Throwable throwable) {
      bodyCF.completeExceptionally(throwable);
    }

    @Override
    public void onComplete() {
      bodyCF.complete(asString(responseData));
    }

    private String asString(List<ByteBuffer> buffers) {
      return new String(toBytes(buffers), StandardCharsets.UTF_8);
    }

    private byte[] toBytes(List<ByteBuffer> buffers) {
      int size = buffers.stream()
          .mapToInt(ByteBuffer::remaining)
          .sum();
      byte[] bs = new byte[size];
      int offset = 0;
      for (ByteBuffer buffer : buffers) {
        int remaining = buffer.remaining();
        buffer.get(bs, offset, remaining);
        offset += remaining;
      }
      return bs;
    }

  }
}

Проверка его

Чтобы протестировать это решение, вам потребуется сервер, который отправляет ответ, которыйиспользует Transfer-encoding: chunked и отправляет его достаточно медленно, чтобы наблюдать за прибытием кусков.Я создал его в https://github.com/hohonuuli/demo-chunk-server, но вы можете раскрутить его с помощью Docker следующим образом:

docker run -p 8080:8080 hohonuuli/demo-chunk-server

Затем запустите код CustomSubscriber.java, используя java CustomSubscriber.java http://localhost:8080/chunk/10

0 голосов
/ 12 октября 2018

Вы можете печатать ByteBuffer s по мере их поступления, но нет гарантии, что ByteBuffer соответствует чанку.Куски обрабатываются стеком.Один ByteBuffer фрагмент будет выдвинут для каждого фрагмента, но если в буфере недостаточно места, будет добавлен частичный фрагмент.Потребитель видит только поток ByteBuffer с данными.Поэтому вы можете распечатать эти ByteBuffer по мере их поступления, но у вас нет гарантии, что они точно соответствуют одному чанку, как было отправлено сервером.

Примечание. Если тело вашего запросана основе текста, тогда вы можете использовать BodyHandlers.fromLineSubscriber(Subscriber<? super String> subscriber) с пользовательским Subscriber<String>, который будет печатать каждую строку, как она есть.BodyHandlers.fromLineSubscriber делает жесткое слово декодирования байтов в символы, используя набор символов, указанный в заголовках ответа, буферизует байты, если необходимо, до тех пор, пока они не будут декодированы (ByteBuffer может заканчиваться в середине последовательности кодирования, если текст содержит символы, закодированные внесколько байтов), и разбивая их на границе строки.Метод Subscriber :: onNext будет вызываться один раз для каждой строки в тексте.См. https://download.java.net/java/early_access/jdk11/docs/api/java.net.http/java/net/http/HttpResponse.BodyHandlers.html#fromLineSubscriber(java.util.concurrent.Flow.Subscriber) для получения дополнительной информации.

...