Использование Java 11 HttpClient для чтения фрагментированных данных - PullRequest
0 голосов
/ 16 октября 2018

Я пытаюсь прочитать фрагментированные данные из ответа Http, используя Java 11 java.net.http.HttpClient, но я получаю только одну строку за раз.Мне нужно получить весь кусок за раз.

Вот мой код:

    final InputStream eventStream;

    try {
        HttpResponse<InputStream> httpResponse = httpClient.send(HttpRequest
                .newBuilder(
                        new URI(this.config.getEnvironmentAccess().getUrl() + ":<port>/status/?pretty=true"))
                .GET().build(), BodyHandlers.ofInputStream());

        LOGGER.info("event stream HttpResponse received");
        LOGGER.info("statusCode: {}", httpResponse.statusCode());
        LOGGER.info("headers: {}", httpResponse.headers());
        LOGGER.info("version: {}", httpResponse.version());
        LOGGER.info("request: {}", httpResponse.request());

        eventStream = httpResponse.body();
    } catch (IOException | InterruptedException | URISyntaxException e) {
        throw new RuntimeException("Unable to get status event stream", e);
    }

    BufferedReader br = new BufferedReader(new InputStreamReader(eventStream));
    String line = "";

    try {
        while ((line = br.readLine()) != null) {
            LOGGER.info("readLine(): {}", line);
        }
    } catch (IOException e) {
        throw new RuntimeException("Unable to read status event stream", e);
    }

Но я получаю ответ (ы) в виде отдельных строк:

824 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:352 - starting to listen for thing script responses . . .
2018-10-15 20:43:34,057-0400 9107 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:362 - Thing event stream HttpResponse received
2018-10-15 20:43:34,058-0400 9108 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:363 - statusCode: 200
2018-10-15 20:43:34,059-0400 9109 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:364 - headers: java.net.http.HttpHeaders@a6764984 { {content-type=[application/json], date=[Tue, 16 Oct 2018 00:43:34 GMT], server=[Jetty(9.2.z-SNAPSHOT)], transfer-encoding=[chunked]} }
2018-10-15 20:43:34,060-0400 9110 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:365 - version: HTTP_1_1
2018-10-15 20:43:34,060-0400 9110 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:366 - request: http://thing-url.company.com:<port>/status/?pretty=true GET
2018-10-15 20:43:34,063-0400 9113 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): {
2018-10-15 20:43:34,063-0400 9113 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():   "Header" : {
2018-10-15 20:43:34,064-0400 9114 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():     "name" : "header",
2018-10-15 20:43:34,066-0400 9116 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():     "value" : {
2018-10-15 20:43:34,067-0400 9117 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():       "version" : 1.2
2018-10-15 20:43:34,067-0400 9117 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():     }
2018-10-15 20:43:34,068-0400 9118 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():   }
2018-10-15 20:43:34,100-0400 9150 [main] INFO    c.p.perf.replay.ThingReplay.replayScripts:108 - submitting thing script #1 of 10
2018-10-15 20:43:34,111-0400 9161 [main] INFO       c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to thing: {"submissionTime":"2018-10-03T04:34:19.054Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0","service":"dataManagerService.getData","requestNumber":177}
2018-10-15 20:43:34,188-0400 9238 [main] INFO    c.p.perf.replay.ThingReplay.replayScripts:108 - submitting thing script #2 of 10
2018-10-15 20:43:34,191-0400 9241 [main] INFO       c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to thing: {"submissionTime":"2018-10-03T04:34:23.157Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0ddeebddaa854c6ab2ac33b911af28aa","service":"dataManagerService.getData","requestNumber":180}
2018-10-15 20:43:34,233-0400 9283 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } {
2018-10-15 20:43:34,233-0400 9283 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():   "Group" : {
2018-10-15 20:43:34,233-0400 9283 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():     "name" : "groupStart",
2018-10-15 20:43:34,234-0400 9284 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():     "value" : {
2018-10-15 20:43:34,234-0400 9284 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():       "groupId" : "57b94dcc-fad8-40f2-bb86-6d6894f26f26",
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():       "requestInfo" : [ {
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():         "sessionId" : "5fa9bc47fd5d450da9323b5d35b14e89",
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():         "requestId" : "25ed5909e04d4caba4d57c41dd85dea0"
2018-10-15 20:43:34,235-0400 9285 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():       } ]
2018-10-15 20:43:34,236-0400 9286 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():     }
2018-10-15 20:43:34,236-0400 9286 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():   }
2018-10-15 20:43:34,245-0400 9295 [main] INFO    c.p.perf.replay.ThingReplay.replayScripts:108 - submitting thing script #3 of 10
2018-10-15 20:43:34,246-0400 9296 [main] INFO       c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to thing: {"submissionTime":"2018-10-03T04:34:27.362Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0b8fa91c6b20478589d66f90af80f481","service":"dataManagerService.getData","requestNumber":182}
2018-10-15 20:43:34,246-0400 9296 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } {
2018-10-15 20:43:34,247-0400 9297 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():   "Event" : {
2018-10-15 20:43:34,247-0400 9297 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():     "name" : "jobStart",
2018-10-15 20:43:34,247-0400 9297 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():     "value" : {
2018-10-15 20:43:34,248-0400 9298 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():       "stats" : {
2018-10-15 20:43:34,249-0400 9299 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():         "totalTasks" : 1,
2018-10-15 20:43:34,250-0400 9300 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():         "runningTasks" : 0,
2018-10-15 20:43:34,250-0400 9300 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():         "completeTasks" : 0,
2018-10-15 20:43:34,250-0400 9300 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():         "failedTasks" : 0
2018-10-15 20:43:34,251-0400 9301 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():       },
2018-10-15 20:43:34,251-0400 9301 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():       "coreMsUsed" : -1,
2018-10-15 20:43:34,252-0400 9302 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():       "requestInfo" : [ {
2018-10-15 20:43:34,252-0400 9302 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():         "sessionId" : "5fa9bc47fd5d450da9323b5d35b14e89",
2018-10-15 20:43:34,253-0400 9303 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():         "requestId" : "25ed5909e04d4caba4d57c41dd85dea0"
2018-10-15 20:43:34,253-0400 9303 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():       } ]
2018-10-15 20:43:34,253-0400 9303 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():     }
2018-10-15 20:43:34,254-0400 9304 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():   }
2018-10-15 20:43:34,254-0400 9304 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine(): } {
2018-10-15 20:43:34,254-0400 9304 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForEvents:378 - readLine():   "Event" : {

Я понимаю, что это потому, что я использую BufferedReader и говорю ему записывать каждую строку, когда я получаю ее, но я не знаю, как читать строки как куски.

Может ли кто-нибудь помочь мне понять, как атомарно читать чанки, чтобы я мог анализировать их как JSON, по одному событию за раз?


Решение

Пришлось использовать JsonProcessor для анализа потока Json по мере его поступления:

private void listenForEvents(final IMakeHttpRequests httpClient) {
    assert httpClient != null : "httpClient cannot be null";
    LOGGER.info("starting to listen for script responses . . .");

    final InputStream eventStream;
    final JsonParser parser;

    try {
        HttpResponse<InputStream> httpResponse = httpClient.send(HttpRequest
                .newBuilder(
                        new URI(this.config.getEnvironmentAccess().getUrl() + ":<port>/status/?pretty=true"))
                .GET().build(), BodyHandlers.ofInputStream());

        eventStream = httpResponse.body();
    } catch (IOException | InterruptedException | URISyntaxException e) {
        throw new RuntimeException("Unable to get status event stream", e);
    }

    try {
        parser = JsonUtils.getObjectMapper().getFactory().createParser(eventStream);

        while (parser.nextToken() != null) {
            final TreeNode tree = parser.readValueAsTree();
            LOGGER.info("tree: {}", tree);
        }
    } catch (IOException e) {
        throw new RuntimeException("Unable to parse event stream to Json", e);
    }
}

2018-10-16 13:25:32,081-0400 12622 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForThingEvents:352 - starting to listen for script responses . . .
2018-10-16 13:25:32,808-0400 13349 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForThingEvents:363 - Thing event stream HttpResponse received
2018-10-16 13:25:32,808-0400 13349 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForThingEvents:364 - statusCode: 200
2018-10-16 13:25:32,808-0400 13349 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForThingEvents:365 - headers: java.net.http.HttpHeaders@67b87c21 { {content-type=[application/json], date=[Tue, 16 Oct 2018 17:25:33 GMT], server=[Jetty(9.2.z-SNAPSHOT)], transfer-encoding=[chunked]} }
2018-10-16 13:25:32,809-0400 13350 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForThingEvents:366 - version: HTTP_1_1
2018-10-16 13:25:32,809-0400 13350 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForThingEvents:367 - request: http://<server>.company.com:<port>/status/?pretty=true GET
2018-10-16 13:25:32,819-0400 13360 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForThingEvents:379 - tree: {"Header":{"name":"header","value":{"version":1.2}}}
2018-10-16 13:25:32,835-0400 13376 [main] INFO    c.p.perf.replay.ThingReplay.replayScripts:108 - submitting pipeline script #1 of 10
2018-10-16 13:25:32,843-0400 13384 [main] INFO       c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to pipeline: {"submissionTime":"2018-10-03T04:34:19.054Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0","service":"dataManagerService.getData","requestNumber":177}
2018-10-16 13:25:33,074-0400 13615 [main] INFO    c.p.perf.replay.ThingReplay.replayScripts:108 - submitting pipeline script #2 of 10
2018-10-16 13:25:33,075-0400 13616 [main] INFO       c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to pipeline: {"submissionTime":"2018-10-03T04:34:23.157Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0ddeebddaa854c6ab2ac33b911af28aa","service":"dataManagerService.getData","requestNumber":180}
2018-10-16 13:25:33,117-0400 13658 [main] INFO    c.p.perf.replay.ThingReplay.replayScripts:108 - submitting pipeline script #3 of 10
2018-10-16 13:25:33,117-0400 13658 [main] INFO       c.p.perf.api.dao.ThingDao.submitScript:100 - submitting script to pipeline: {"submissionTime":"2018-10-03T04:34:27.362Z[GMT]","user":"userperson@company.com","sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"0b8fa91c6b20478589d66f90af80f481","service":"dataManagerService.getData","requestNumber":182}
2018-10-16 13:25:33,237-0400 13778 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForThingEvents:379 - tree: {"Group":{"name":"groupStart","value":{"groupId":"b205e673-2eb1-4352-b207-fd4917be292a","requestInfo":[{"sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0"}]}}}
2018-10-16 13:25:33,238-0400 13779 [pool-1-thread-1] INFO     c.p.perf.replay.ThingReplay.listenForThingEvents:379 - tree: {"Event":{"name":"jobStart","value":{"stats":{"totalTasks":1,"runningTasks":0,"completeTasks":0,"failedTasks":0},"coreMsUsed":-1,"requestInfo":[{"sessionId":"5fa9bc47fd5d450da9323b5d35b14e89","requestId":"25ed5909e04d4caba4d57c41dd85dea0"}]}}}
. . .

Ответы [ 2 ]

0 голосов
/ 16 октября 2018

Кажется, что вы читаете поток построчно, независимо от того, как поступают чанки (BufferedReader заботится о переформатировании данных в строки).Однако никто не гарантирует, что один блок содержит одну запись JSON.Сервер может порционировать данные на основе своего внутреннего буфера.

При анализе потока вы можете взглянуть на потоковый API Джексона.Модель программирования немного сложнее, но она может удовлетворить ваши потребности.Пример https://www.baeldung.com/jackson-streaming-api или реализация jre по умолчанию https://docs.oracle.com/javaee/7/api/javax/json/stream/package-summary.html

0 голосов
/ 16 октября 2018

Вы можете использовать StringBuilder, чтобы получить желаемый результат.

try {
     StringBuilder sb = new StringBuilder();
     while ((line = br.readLine()) != null) {
         sb.append(line);
     }
     LOGGER.info("readLine(): {}", sb.toString());
} catch (IOException e) {
    throw new RuntimeException("Unable to read status event stream", e);
}
...