Подтверждение потребления сообщений в Apache Kafka - PullRequest
0 голосов
/ 06 июня 2018

Я реализовал Java Consumer, который потребляет сообщения из раздела Kafka, которые затем отправляются с запросами POST в REST API.

    while (true) {
        ConsumerRecords<String, Object> records = consumer.poll(200);
        for (ConsumerRecord<String, Object> record : records) {
            CloseableHttpClient httpClient = HttpClientBuilder.create().build();  
            Object message = record.value();
            JSONObject jsonObj = new JSONObject(message.toString());
            try {
                HttpPost request = new HttpPost(this.getConsumerProperties().getProperty("api.url"));
                StringEntity params = new StringEntity(message.toString());
                request.addHeader("content-type", "application/json");
                request.addHeader("Accept", "application/json");
                request.setEntity(params);

                CloseableHttpResponse response = httpClient.execute(request);
                HttpEntity entity = response.getEntity();
                String responseString = EntityUtils.toString(entity, "UTF-8");
                System.out.println(message.toString());
                System.out.println(responseString);
            } catch(Exception ex) {
              ex.printStackTrace();
            }  finally {
                try {   
                    httpClient.close(); 
                } catch(IOException ex) {
                    ex.printStackTrace();
                } 
            }                  
        } 
    }   

Скажите, что сообщение было использовано, но класс Java не смог связаться с REST API.Сообщение никогда не будет доставлено, но оно будет помечено как использованное.Как лучше всего обращаться с такими случаями?Могу ли я как-то подтвердить сообщения, если и только если ответ от REST API был успешным?

1 Ответ

0 голосов
/ 06 июня 2018

В свойствах потребителя установите для enable.auto.commit значение false.Это будет означать, что ответственность за принятие смещения лежит на потребителе.

Итак, в приведенном выше примере на основе response.statusCode вы можете выбрать фиксацию смещения, вызвав consumer.commitAsync ().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...