Зафиксировать сообщение асинхронно сразу после прочтения темы - PullRequest
3 голосов
/ 29 мая 2019

Я пытаюсь зафиксировать сообщение сразу после прочтения его из темы.Я перешел по этой ссылке (https://www.confluent.io/blog/apache-kafka-spring-boot-application), чтобы создать получателя Kafka с пружиной. Обычно он работает отлично, и получатель получает сообщение и ждет, пока в очередь не войдет другой человек. Но проблема в том, что когда я обрабатываю это сообщениеэто занимает много времени (около 10 минут), очередь kafka считает, что сообщение не израсходовано (передано), и потребители читают его снова и снова. Я должен сказать, что, когда мое время обработки меньше 5 минут, оно работает хорошоно когда он длится дольше, он не фиксирует сообщение.

Я искал несколько ответов, но это не помогло мне, потому что я не использую один и тот же исходный код (и, конечно, другойструктура). Я пытался отправить асинхронные методы, а также для асинхронной фиксации сообщения, но мне не удалось. Некоторые из источников:

Spring Boot Kafka: Не удается выполнить коммит, так как группауже перебалансирован

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

Kafka 0.10 Java-потребитель нечтение сообщения из темы

https://github.com/confluentinc/confluent-kafka-dotnet/issues/470

Основной класс находится здесь:


@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApp .class, args);
    }


Класс потребителя (где я должен зафиксировать свое сообщение)

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

        Properties  props=prope.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }


Как я могу зафиксировать сообщение сразу после того, как прочитал его из очереди.

Я хочу быть уверен, что при получении сообщения я немедленно отправляю сообщение.Прямо сейчас сообщение передается, когда я заканчиваю выполнять метод сразу после (System.out.println).Так может кто-нибудь сказать мне, как это сделать?

----- update -------

Извините за поздний ответ, но, как подсказал @GirishB, я искалк конфигурации GirishB, но я не вижу, где я могу определить тему, которую я хочу прочитать / прослушать из моего файла конфигурации (application.yml).Все примеры, которые я вижу, используют структуру, подобную этой (http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html). Есть ли возможность, что я могу прочитать тему, объявленную на другом сервере? Используя что-то похожее на этот @KafkaListener (themes = "$ {app.topic.pro} ", groupId =" group_id ")

=========== РЕШЕНИЕ 1 ========================================

Я следовал совету @victor gallet и включил объявление о конфумерах в oder для размещения «Подтверждения»объект в методе потребления. Я также перешел по этой ссылке (https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/CommonConfiguration.java), чтобы получить все методы, которые я использовал для объявления и установки всех свойств (consumerProperties, consumerFactory, kafkaListenerContainerFactory). Единственная проблема, которую я нашелэто объявление "new SeekToCurrentErrorHandler ()", потому что я получаю сообщение об ошибке, и на данный момент я не могу ее устранить (было бы здорово, если бы кто-то объяснил мне это).


@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;


   @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        //factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

     @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        Properties  propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
        //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer")); 
        return props;
    }




    @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
        acknowledgment.acknowledge();// commit immediately
        Properties  props=prop.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }

``````````````````````````````````````````````````````````

Ответы [ 2 ]

2 голосов
/ 15 июля 2019

Вам необходимо изменить конфигурацию потребителя, указав для свойства enable.auto.commit значение false:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Затем вам нужно изменить заводскую настройку Spring Kafka Listener и установить ack-mode на MANUAL_IMMEDIATE. Вот пример ConcurrentKafkaListenerContainerFactory:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    return factory;
}

Как объяснено в документации, MANUAL_IMMEDIATE означает: зафиксировать смещение немедленно, когда слушатель вызывает метод Acknowledgment.acknowledge ().

Вы можете найти все способы совершения здесь .

Затем в своем коде слушателя вы можете зафиксировать смещение вручную, добавив объект Acknowledgment, например:

@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
   // commit immediately
    acknowledgment.acknowledge();
}
1 голос
/ 29 мая 2019

Вы можете использовать java.util.concurrent.BlockingQueue для отправки сообщения по мере использования и фиксации смещения Кафки.Затем с помощью другого потока получают сообщение из blockingQueue и обрабатывают.Таким образом, вам не нужно ждать завершения обработки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...