Кафка, игнорирующая `action.timeout.ms` для продюсера - PullRequest
1 голос
/ 05 июня 2019

Я настраиваю производителя на 10-секундный таймаут, используя свойство transaction.timeout.ms. Однако кажется, что транзакция прерывается через 60 секунд, что намного дольше.

См. Следующую программу:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokerConnectionString);
properties.setProperty("transactional.id", "my-transactional-id");
properties.setProperty("transaction.timeout.ms", "5000");

// start the first producer and write one event
KafkaProducer<String, String> producer =
        new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic", "value"));
// note the transaction is left non-completed

// start another producer with different txn.id and write second event
properties.setProperty("transactional.id", "another-transactional-id");
KafkaProducer<String, String> producer2 =
        new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
producer2.initTransactions();
producer2.beginTransaction();
producer2.send(new ProducerRecord<>("topic", "value2"));
producer2.commitTransaction();

// consume the events using read-committed
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", brokerConnectionString);
consumerProps.setProperty("group.id", "my-group");
consumerProps.setProperty("auto.offset.reset", "earliest");
consumerProps.setProperty("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = 
        new KafkaConsumer<>(consumerProps, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(singleton("topic"));
while (true) {
    for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofSeconds(1))) {
        logger.info(record.toString());
    }
}

value2 печатается примерно через 60 секунд, что является значением по умолчанию для параметра transaction.timeout.ms. Я правильно понимаю собственность?

1 Ответ

1 голос
/ 05 июня 2019

В процессе написания вопроса я нашел ответ. Брокер настроен на проверку тайм-аутов производителей каждые 60 секунд, поэтому транзакция прерывается при следующей проверке. Это свойство настраивает его: transaction.abort.timed.out.transaction.cleanup.interval.ms. Я начал работу с брокером как раз перед тестированием, поэтому на это всегда уходило около 60 секунд.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...