Kafka Concurrent Consumer делает автоматическое принятие кафки в значение false. Затем, основываясь на другом вызове Microservice в Async, используя Rest Template, мы пытаемся подтвердить ответ. Мы получаем ошибку, что KafkaConsumer не является безопасным для многопоточного доступа, в то время какcknowledge.acknowledge ();выполняется.
Если мы удаляем асинхронность в методе, она не вызывает исключение и работает нормально.
@Async("******")
public CompletableFuture<String> process*****Service(****** ******, String serviceUrl, Acknowledgment acknowledgment){
ResponseEntity<String> serviceResponse = null;
int retryCount = 0;
String response = null;
try {
HttpHeaders headers = KafkaConsumerUtil.prepareHeaders(ApplicationConstant.****, apikey);
HttpEntity<******> entity = new HttpEntity<>(*****, headers);
retryCount++;
System.out.println("Attempting retry mechanism with counter "+retryCount);
serviceResponse = restTemplate.exchange(serviceUrl, HttpMethod.POST, entity, String.class);
if (null != serviceResponse) {
response = serviceResponse.getStatusCode().toString();
acknowledgment.acknowledge();
}
}catch (Exception e) {
log.error(ApplicationConstant.*****.concat("Exception caught at process*****Service "+e.getMessage()));
String producerUrl = appProperties.getKafkapropmap().get(ApplicationConstant.PUBLISHER_MS_SERVICE_URL);
// deadLetterTopic.invokeDeadLetterTopic(****Request, producerUrl, acknowledgment);
}
return CompletableFuture.completedFuture(response);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Long, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(Integer.parseInt(concurrency));
return factory;
}
and
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Ошибка
2019-10-17 11:59:03.612 [Pre-******-3] ERROR c.a.m.reg.service.******* -
********: Exception caught at ******Service KafkaConsumer is not safe for multi-threaded access
Мы используем spring-kafka 1.3.0.RELEASE
Любые советы приветствуются !!