У меня есть Java Сервис, который читает сообщения от Кафки. Сервис очень прост. У меня есть слушатель:
@KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.topic.group}", containerFactory = "mesagueKafkaListenerContainerFactory")
Затем у меня есть этот conf:
@Autowired
private KafkaProperty kafkaProperty;
private ConsumerFactory<String, KafkaMessage> mesagueConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperty.getServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.kafkaProperty.getGroupIdTest());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaProperty.getAutoCommit());
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(KafkaMessage.class));
}
private ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperty.getServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.kafkaProperty.getAutoCommit());
return new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new JsonDeserializer<>(KafkaMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> mesagueKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(mesagueConsumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setErrorHandler(new KafkaErrorHandler());
return factory;
}
И затем:
kafka:
topic:
name: name
group: 1
bootstrap:
servers: xxx
autoCommit: false
Когда я отправляю сообщение в очередь , службы обрабатывают его ОК.
Но когда я перезапускаю службу, она снова читает все сообщения из очереди (сообщения уже обработаны)
Я хочу обрабатывать только новые сообщения, которые не обрабатываются.
Заранее спасибо.