Я реализовал Java Consumer, который потребляет сообщения из раздела Kafka, которые затем отправляются с запросами POST в REST API.
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
Object message = record.value();
JSONObject jsonObj = new JSONObject(message.toString());
try {
HttpPost request = new HttpPost(this.getConsumerProperties().getProperty("api.url"));
StringEntity params = new StringEntity(message.toString());
request.addHeader("content-type", "application/json");
request.addHeader("Accept", "application/json");
request.setEntity(params);
CloseableHttpResponse response = httpClient.execute(request);
HttpEntity entity = response.getEntity();
String responseString = EntityUtils.toString(entity, "UTF-8");
System.out.println(message.toString());
System.out.println(responseString);
} catch(Exception ex) {
ex.printStackTrace();
} finally {
try {
httpClient.close();
} catch(IOException ex) {
ex.printStackTrace();
}
}
}
}
Скажите, что сообщение было использовано, но класс Java не смог связаться с REST API.Сообщение никогда не будет доставлено, но оно будет помечено как использованное.Как лучше всего обращаться с такими случаями?Могу ли я как-то подтвердить сообщения, если и только если ответ от REST API был успешным?