Я использую Spring Kafka и Spring Boot и просто задаюсь вопросом, как настроить моего потребителя, например:
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {
// do some logic
ack.acknowledge();
}
, чтобы использовать точно однократную гарантию доставки?
Должен ли я только добавить org.springframework.transaction.annotation.Transactional
аннотации по sendPost
методу и это все или мне нужно выполнить несколько дополнительных шагов для достижения этой цели?
ОБНОВЛЕНО
Это мой текущий конфиг
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTransactionManager<Object, Object> transactionManager) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
//factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setTransactionManager(transactionManager);
factory.setConsumerFactory(consumerFactory(kafkaProperties));
return factory;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);
return props;
}
@Bean
public ProducerFactory<String, Post> postProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Post> postKafkaTemplate() {
return new KafkaTemplate<>(postProducerFactory());
}
@Bean
public ProducerFactory<String, Update> updateProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Update> updateKafkaTemplate() {
return new KafkaTemplate<>(updateProducerFactory());
}
@Bean
public ProducerFactory<String, Message> messageProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Message> messageKafkaTemplate() {
return new KafkaTemplate<>(messageProducerFactory());
}
, но происходит сбой со следующей ошибкой:
***************************
APPLICATION FAILED TO START
***************************
Description:
Parameter 0 of method kafkaTransactionManager in org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration required a single bean, but 3 were found:
- postProducerFactory: defined by method 'postProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]
- updateProducerFactory: defined by method 'updateProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]
- messageProducerFactory: defined by method 'messageProducerFactory' in class path resource [com/example/domain/configuration/messaging/KafkaProducerConfig.class]
Что я делаю не так?