Как получить потоковый ответ от KSQL в Spring Kafka? - PullRequest
0 голосов
/ 19 января 2019

Как получить chunked-ответ от KSQL-сервера kafka в приложении весенней загрузки java?

Когда я делаю вызов rest в конечную точку /query, я просто получаю 1 строку, и соединение закрывается.Как я могу держать соединение открытым и получать несколько строк?

Документ говорит:

Ответ возвращается обратно до тех пор, пока не будет достигнут предел, указанный в операторе, или клиент не закроетсоединение.

Как это сделать в Java?Даже для KTable я получаю только 1 строку взамен.

https://docs.confluent.io/current/ksql/docs/developer-guide/api.html#run-a-query-and-stream-back-the-output

1 Ответ

0 голосов
/ 20 января 2019

Я смог обойти это следующим образом:

  • получить ответ в виде строки
  • анализировать объекты JSON построчно (KafkaQueryResponseявляется объектом, представляющим 1 строку)

        ResponseEntity<String> result = template.exchange("/query",
            HttpMethod.POST,
            new HttpEntity<>(params, headers),
            String.class);
    
        List<KafkaQueryResponse> array = new ArrayList<>();
        JsonFactory jsonFactory = new JsonFactory();
        try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
            Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
            value.forEachRemaining(e -> {
                if (e.getRow() != null) {
                    array.add(e);
                }
            });
        }
        array <----  this is the list of JSON objects
    

KafkaQueryResponse

    @Data
    @JsonIgnoreProperties(ignoreUnknown = true)
    public class KafkaQueryResponse {
        private KafkaQueryRow row;
        private String finalMessage;
        private String errorMessage;

        @Data
        @JsonIgnoreProperties(ignoreUnknown = true)
        public static class KafkaQueryRow {
            private List<Object> columns;
        }
    }

Это решение не позволяет считывать потоковый ответ в виде фрагментов.Он ожидает полного ответа клиента, затем закрывает соединение и затем анализирует все объекты json.

...