Я делаю проект, используя springboot 2, kafk 2.2.0, spring-kafka 2.2.5
Я создал kafka exactly once
окружение, и создание и потребление сообщений были хорошими.
НО kafka-consumer-groups.sh
сказал вот так.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test_topic 0 23 24 1
test_topic 1 25 26 1
test_topic 2 21 22 1
Я просто отправляю Кафке только одно сообщение, но LOG-END-OFFSET
удваивается, и 1 лаг остается всегда. (В моем Java-приложении производим и потребляем работаем по назначению)
Я не знаю, почему LOG-END-OFFSET удвоился.
При удалении exactly once
config, нет проблем в LOG-END-OFFSET
и CURRENT-OFFSET
count.
Это мои kafkaTemplate
коды установки.
@Bean
@Primary
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092";
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// exactly once producer setup
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>(KafkaStaticOptions.OBJECT_MAPPER));
factory.setTransactionIdPrefix("my.transaction.");
return factory;
}
@Bean
@Primary
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
@Primary
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
Код моего производителя.
kafkaTemplate.executeInTransaction(kt -> kt.send("test_topic", "test data hahaha"));
Я проверил, когда LOG-END-OFFSET удвоился, и это produce transaction commit
синхронизация.
Что я сделал неправильной конфигурации?