Потребитель Kafka при автоматической фиксации false создает исключение при подтверждении асинхронного вызова «KafkaConsumer небезопасен для многопоточного доступа» - PullRequest
0 голосов
/ 17 октября 2019

Kafka Concurrent Consumer делает автоматическое принятие кафки в значение false. Затем, основываясь на другом вызове Microservice в Async, используя Rest Template, мы пытаемся подтвердить ответ. Мы получаем ошибку, что KafkaConsumer не является безопасным для многопоточного доступа, в то время какcknowledge.acknowledge ();выполняется.

Если мы удаляем асинхронность в методе, она не вызывает исключение и работает нормально.

    @Async("******")
    public CompletableFuture<String> process*****Service(****** ******, String serviceUrl, Acknowledgment acknowledgment){

        ResponseEntity<String> serviceResponse = null;
        int retryCount = 0;
        String response = null;
        try {
            HttpHeaders headers = KafkaConsumerUtil.prepareHeaders(ApplicationConstant.****, apikey);
            HttpEntity<******> entity = new HttpEntity<>(*****, headers);
            retryCount++; 
            System.out.println("Attempting retry mechanism with counter "+retryCount);
            serviceResponse = restTemplate.exchange(serviceUrl, HttpMethod.POST, entity, String.class);
            if (null != serviceResponse) {
                response = serviceResponse.getStatusCode().toString();
                acknowledgment.acknowledge();
            }
        }catch (Exception e) {
            log.error(ApplicationConstant.*****.concat("Exception caught at process*****Service "+e.getMessage()));
            String producerUrl = appProperties.getKafkapropmap().get(ApplicationConstant.PUBLISHER_MS_SERVICE_URL);
//          deadLetterTopic.invokeDeadLetterTopic(****Request, producerUrl, acknowledgment);
        }
        return CompletableFuture.completedFuture(response);
    }



    @Bean
    public ConcurrentKafkaListenerContainerFactory<Long, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Long, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(Integer.parseInt(concurrency));
        return factory;
    }

    and
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 

Ошибка

2019-10-17 11:59:03.612 [Pre-******-3] ERROR c.a.m.reg.service.******* -
                ********: Exception caught at ******Service KafkaConsumer is not safe for multi-threaded access

Мы используем spring-kafka 1.3.0.RELEASE

Любые советы приветствуются !!

1 Ответ

0 голосов
/ 17 октября 2019

Во-первых, вы должны обновить до последней версии 1.3.x (1.3.10).

В более старых версиях вы не можете использовать MANUAL_IMMEDIATE с асинхронными подтверждениями;вы можете использовать только AckMode.MANUAL, который ставит в очередь подтверждение для основного потока потребителя, который будет выполнять фиксацию.

Потребитель не является потокобезопасным.

MANUAL_IMMEDIATE можно использовать только в том случае, еслиподтверждение выполняется потоком, который вызывает слушателя. Контейнер теперь определяет, какой поток выполняет подтверждение, и поставит в очередь асинхронную фиксацию.

Это было исправлено в 1.3.4, но последняя версия - 1.3.10.

...