Я пытаюсь опубликовать sh в транзакции сообщение на 16 разделах Kafka на 7 брокерах.
Поток выглядит так:
- открытая транзакция
- написать сообщение на 16 разделов
- совершить транзакцию
- спать 25 мс
- повторить
Иногда транзакция занимает более 1 секунды для полный, в среднем 50 мс. После включения ведения журнала трассировки на стороне производителя я заметил следующую ошибку:
TRACE internals.TransactionManager [kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1, transactionalId=cma-2]
Received transactional response AddPartitionsToTxnResponse(errors={modelapp-ecb-0=CONCURRENT_TRANSACTIONS, modelapp-ecb-9=CONCURRENT_TRANSACTIONS, modelapp-ecb-10=CONCURRENT_TRANSACTIONS, modelapp-ecb-11=CONCURRENT_TRANSACTIONS, modelapp-ecb-12=CONCURRENT_TRANSACTIONS, modelapp-ecb-13=CONCURRENT_TRANSACTIONS, modelapp-ecb-14=CONCURRENT_TRANSACTIONS, modelapp-ecb-15=CONCURRENT_TRANSACTIONS, modelapp-ecb-1=CONCURRENT_TRANSACTIONS, modelapp-ecb-2=CONCURRENT_TRANSACTIONS, modelapp-ecb-3=CONCURRENT_TRANSACTIONS, modelapp-ecb-4=CONCURRENT_TRANSACTIONS, modelapp-ecb-5=CONCURRENT_TRANSACTIONS, modelapp-ecb-6=CONCURRENT_TRANSACTIONS, modelapp-ecb-=CONCURRENT_TRANSACTIONS, modelapp-ecb-8=CONCURRENT_TRANSACTIONS}, throttleTimeMs=0)
for request (type=AddPartitionsToTxnRequest, transactionalId=cma-2, producerId=59003, producerEpoch=0, partitions=[modelapp-ecb-0, modelapp-ecb-9, modelapp-ecb-10, modelapp-ecb-11, modelapp-ecb-12, modelapp-ecb-13, modelapp-ecb-14, modelapp-ecb-15, modelapp-ecb-1, modelapp-ecb-2, modelapp-ecb-3, modelapp-ecb-4, modelapp-ecb-5, modelapp-ecb-6, modelapp-ecb-7, modelapp-ecb-8])
Производитель Kafka повторяет отправку AddPartitionsToTxnRequest (s) несколько раз, пока это не удается, но это приводит к задержкам.
Код выглядит следующим образом:
Properties producerProperties = PropertiesUtil.readPropertyFile(_producerPropertiesFile);
_producer = new KafkaProducer<>(producerProperties);
_producer.initTransactions();
_producerService = Executors.newSingleThreadExecutor(new NamedThreadFactory(getClass().getSimpleName()));
_producerService.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
_producer.beginTransaction();
for (int partition = 0; partition < _numberOfPartitions; partition++)
_producer.send(new ProducerRecord<>(_producerTopic, partition, KafkaRecordKeyFormatter.formatControlMessageKey(_messageNumber, token), EMPTY_BYTE_ARRAY));
_producer.commitTransaction();
_messageNumber++;
Thread.sleep(_timeBetweenProducedMessagesInMillis);
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException | UnsupportedVersionException e) {
closeProducer();
break;
} catch (KafkaException e) {
_producer.abortTransaction();
} catch (InterruptedException e) {...}
}
});
Глядя на код брокера, кажется, есть 2 случая, когда эта ошибка выдается, но я не могу сказать, почему я туда попал
object TransactionCoordinator {
...
def handleAddPartitionsToTransaction(...): Unit = {
...
if (txnMetadata.pendingTransitionInProgress) {
// return a retriable exception to let the client backoff and retry
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
Left(Errors.CONCURRENT_TRANSACTIONS)
}
...
}
...
}
Спасибо заранее за помощью!
Позднее редактирование:
Включив ведение журнала трассировки на брокере, мы смогли увидеть, что брокер отправляет производителю ответ END_TXN до транзакция достигает состояния CompleteCommit. Производитель может начать новую транзакцию, которая отклоняется посредником, пока он еще находится в переходе PrepareCommit -> CompleteCommit.