Мы работаем с Кафкой уже около года.Около месяца назад мы столкнулись с проблемой, когда один из наших производителей, используя клиент Java, получает исключение InvalidTxnStateException при попытке зафиксировать.С этого момента брокер игнорирует этого производителя, пока он не будет перезагружен.Под игнором я подразумеваю, что когда производитель раскручивается и вызывает initTransaction, время ожидания истекает бесконечно.(пока мы не перезагружаем посредника) Затем, после перезагрузки посредника, мы иногда видим дублирующиеся сообщения.Буду очень признателен за любую помощь или понимание, даже если это просто руководство о том, как отладить такие проблемы.Мы рассматриваем возможность обновления до Kafka 2.0 в понедельник, но я обеспокоен, что эта проблема не исчезнет.
Наша текущая система:
- 3 Kafka Brokers, работающие под управлением Kafka 1.0 (2.12-1.0.0)каждый на своих собственных экземплярах AWS Ubuntu 14.04 - Состояния транзакций имеют коэффициент репликации 2
- 3 экземпляра Zookeeper на своих экземплярах
- 64 разделов по теме с коэффициентом репликации 2
- ~ 60 Производители, работающие с Java-клиентом (версия 2.12-1.0.0)
Конфигурации производителя:
props.put("bootstrap.servers", bootstrapServers);
props.put("transactional.id", transactionId); // hostname of instance
props.put("compression.type", "gzip");
props.put("max.block.ms", 10000);
props.put("enable.idempotence", "true");
props.put("linger.ms", 500);
props.put("batch.size",1048576);
Основные характеристики кода производителя:
// This gets called once at start up
KafkaProducer kafkaProducer = new KafkaProducer<>(this.properties, new StringSerializer(), new StringSerializer());
kafkaProducer.initTransactions();
// This, gets called repeatedly as files get written into a directory...
FileReader fr = null;
BufferedReader br = null;
try {
fr = new FileReader(logFile.getAbsolutePath());
br = new BufferedReader(fr);
kafkaProducer.beginTransaction();
String line;
while ((line = br.readLine()) != null) {
kafkaProducer.send(new ProducerRecord<String, String>(this.topic,null,epoch,key,line)); // key is host name + rand int
}
this.kafkaProducer.commitTransaction();
} catch (IOException e) {
System.out.println("Could not open file!");
e.printStackTrace();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// Guides say to catch exception but in testing exception came about by "caused by"
System.out.println("Unrecoverable exception!");
e.printStackTrace();
kafkaProducer.close();
kafkaProducer = null;
kafkaProducer = createKafkaProducer();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
e.printStackTrace();
System.out.println("Kafka exception: aborting!");
if (e.getCause() != null && (e.getCause() instanceof ProducerFencedException || e.getCause() instanceof OutOfOrderSequenceException || e.getCause() instanceof AuthorizationException || e.getCause() instanceof InvalidTxnStateException)) {
System.out.println("Unrecoverable exception!");
kafkaProducer.close();
kafkaProducer = null;
kafkaProducer = createKafkaProducer();
} else {
kafkaProducer.abortTransaction();
}
}
Ошибка источника:
org.apache.kafka.common.errors.InvalidTxnStateException: Производитель попытался выполнить транзакцию в недопустимом состоянии
Журналы брокера:
[2018-09-21 08: 19: 05,037] ИНФОРМАЦИЯ Удаление индекса /tmp/kafka-logs/logs/prod_tx0-63/00000000000870963261.timeindex.deleted (kafka.log.TimeIndex)
[2018-09-21 08: 19: 16,071] ОШИБКА[ReplicaManager broker = 3] Ошибка обработки операции добавления операции на разделе prod_tx0-58 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.OutOfOrderSequenceException: порядковый номер не по порядку для ManufacturerId 94001: 233824 (входящий след.номер), 228787 (текущий конечный порядковый номер)
[2018-09-21 08: 19: 20,808] ОШИБКА [ReplicaManager broker = 3] Ошибка обработки операции добавления на раздел prod_tx0-11 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.OutOfOrderSequenceException: порядковый номер не по порядку для идентификатор производителя 90031: 448132 (порядковый номер входящего), 443075 (порядковый номер текущего конца)
[2018-09-21 08: 21: 14,656] ОШИБКА [ReplicaManager broker = 3] Ошибка обработки операции добавления операции над разделом prod_tx0-61 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.OutOfOrderSequenceException: порядковый номер неупорядоченного заказа для ManufacturerId 94006: 599(входящий порядковый номер), 594332 (текущий порядковый номер конца)
[2018-09-21 08: 22: 00,222] INFO Свернут новый сегмент журнала для 'prod_tx0-44' за 0 мс.(kafka.log.Log)
[2018-09-21 08: 22: 07,055] ОШИБКА [ReplicaManager broker = 3] Ошибка обработки операции добавления на раздел prod_tx0-49 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.OutOfOrderSequenceException: порядковый номер не по порядку для идентификатор производителя 93006: 213516 (входящий последовательный номер), 208514 (порядковый номер текущего конца)
[2018-09-21 08:22:18,365] ИНФОРМАЦИЯ Увеличение смещения начала журнала раздела prod_tx0-28 до 870795767 в каталоге dir / tmp / kafka-logs / logs (kafka.log.Log)
Примечания:
- Первоначально при создании кластера тема транзакции_стата имела коэффициент репликации 2. Чтобы повысить доступность, мы увеличили коэффициент репликации с помощью инструментов в папке bin.В другом посте SO я читал, что это иногда приводит к ошибкам, но это маловероятно.
- Каждый производитель отправляет брокеру файлы размером около 65 МБ в сообщениях размером около 100 КБ, а затем фиксирует транзакцию.Я не смог найти ничего, что указывало бы, что было оптимальное время или размер, прежде чем транзакция должна была быть зафиксирована.
- Потребители используют commitSync, поэтому я относительно уверен, что дублированные записи не происходят на стороне пользователя.
Еще раз спасибо, что нашли время, чтобы прочитать.