Я пытаюсь зафиксировать сообщение сразу после прочтения его из темы.Я перешел по этой ссылке (https://www.confluent.io/blog/apache-kafka-spring-boot-application), чтобы создать получателя Kafka с пружиной. Обычно он работает отлично, и получатель получает сообщение и ждет, пока в очередь не войдет другой человек. Но проблема в том, что когда я обрабатываю это сообщениеэто занимает много времени (около 10 минут), очередь kafka считает, что сообщение не израсходовано (передано), и потребители читают его снова и снова. Я должен сказать, что, когда мое время обработки меньше 5 минут, оно работает хорошоно когда он длится дольше, он не фиксирует сообщение.
Я искал несколько ответов, но это не помогло мне, потому что я не использую один и тот же исходный код (и, конечно, другойструктура). Я пытался отправить асинхронные методы, а также для асинхронной фиксации сообщения, но мне не удалось. Некоторые из источников:
Spring Boot Kafka: Не удается выполнить коммит, так как группауже перебалансирован
https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
Kafka 0.10 Java-потребитель нечтение сообщения из темы
https://github.com/confluentinc/confluent-kafka-dotnet/issues/470
Основной класс находится здесь:
@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApp .class, args);
}
Класс потребителя (где я должен зафиксировать свое сообщение)
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
Properties props=prope.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
Как я могу зафиксировать сообщение сразу после того, как прочитал его из очереди.
Я хочу быть уверен, что при получении сообщения я немедленно отправляю сообщение.Прямо сейчас сообщение передается, когда я заканчиваю выполнять метод сразу после (System.out.println).Так может кто-нибудь сказать мне, как это сделать?
----- update -------
Извините за поздний ответ, но, как подсказал @GirishB, я искалк конфигурации GirishB, но я не вижу, где я могу определить тему, которую я хочу прочитать / прослушать из моего файла конфигурации (application.yml).Все примеры, которые я вижу, используют структуру, подобную этой (http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html). Есть ли возможность, что я могу прочитать тему, объявленную на другом сервере? Используя что-то похожее на этот @KafkaListener (themes = "$ {app.topic.pro} ", groupId =" group_id ")
=========== РЕШЕНИЕ 1 ========================================
Я следовал совету @victor gallet и включил объявление о конфумерах в oder для размещения «Подтверждения»объект в методе потребления. Я также перешел по этой ссылке (https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java/org/s1p/CommonConfiguration.java), чтобы получить все методы, которые я использовал для объявления и установки всех свойств (consumerProperties, consumerFactory, kafkaListenerContainerFactory). Единственная проблема, которую я нашелэто объявление "new SeekToCurrentErrorHandler ()", потому что я получаю сообщение об ошибке, и на данный момент я не могу ее устранить (было бы здорово, если бы кто-то объяснил мне это).
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<>();
Properties propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
//props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
//props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
//props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer"));
return props;
}
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
acknowledgment.acknowledge();// commit immediately
Properties props=prop.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
``````````````````````````````````````````````````````````