В кафке, при создании сообщения с транзакцией, смещение потребителя удвоилось - PullRequest
1 голос
/ 17 мая 2019

Я делаю проект, используя springboot 2, kafk 2.2.0, spring-kafka 2.2.5

Я создал kafka exactly once окружение, и создание и потребление сообщений были хорошими.

НО kafka-consumer-groups.sh сказал вот так.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
test_topic      0          23              24              1   
test_topic      1          25              26              1   
test_topic      2          21              22              1   

Я просто отправляю Кафке только одно сообщение, но LOG-END-OFFSET удваивается, и 1 лаг остается всегда. (В моем Java-приложении производим и потребляем работаем по назначению)

Я не знаю, почему LOG-END-OFFSET удвоился.

При удалении exactly once config, нет проблем в LOG-END-OFFSET и CURRENT-OFFSET count.

Это мои kafkaTemplate коды установки.

@Bean
    @Primary
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> producerProperties = new HashMap<>();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092";
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // exactly once producer setup
        producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>(KafkaStaticOptions.OBJECT_MAPPER));
        factory.setTransactionIdPrefix("my.transaction.");
        return factory;
    }

    @Bean
    @Primary
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
        ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    @Primary
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

Код моего производителя.

kafkaTemplate.executeInTransaction(kt -> kt.send("test_topic", "test data hahaha"));

Я проверил, когда LOG-END-OFFSET удвоился, и это produce transaction commit синхронизация.

Что я сделал неправильной конфигурации?

1 Ответ

1 голос
/ 17 мая 2019

При использовании транзакций Kafka вставляет « контрольные партии » в журналы, чтобы указать, были ли сообщения частью транзакции.

Этим партиям также назначены смещения, поэтому вы видите, что смещения увеличиваются на 2, даже если вы отправили только одну запись.

Если вы хотите проверить себя, вы можете использовать инструмент DumpLogSegments для отображения содержимого ваших журналов и просмотра контрольного пакета:

./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Dumping /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1558083247264 size: 10315 magic: 2 compresscodec: NONE crc: 3531536908 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 10315 CreateTime: 1558083247414 size: 78 magic: 2 compresscodec: NONE crc: 576574952 isvalid: true

Я использовал транзакционного производителя для отправки одной записи, и вы можете видеть, что вторая запись имеет isControl: true.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...