Транзакция Kafka: получение CONCURRENT_TRANSACTIONS в AddPartitionsToTxnRequest - PullRequest
2 голосов
/ 30 января 2020

Я пытаюсь опубликовать sh в транзакции сообщение на 16 разделах Kafka на 7 брокерах.

Поток выглядит так:

  1. открытая транзакция
  2. написать сообщение на 16 разделов
  3. совершить транзакцию
  4. спать 25 мс
  5. повторить

Иногда транзакция занимает более 1 секунды для полный, в среднем 50 мс. После включения ведения журнала трассировки на стороне производителя я заметил следующую ошибку:

TRACE internals.TransactionManager [kafka-producer-network-thread | producer-1] - [Producer clientId=producer-1, transactionalId=cma-2] 
Received transactional response AddPartitionsToTxnResponse(errors={modelapp-ecb-0=CONCURRENT_TRANSACTIONS, modelapp-ecb-9=CONCURRENT_TRANSACTIONS, modelapp-ecb-10=CONCURRENT_TRANSACTIONS, modelapp-ecb-11=CONCURRENT_TRANSACTIONS, modelapp-ecb-12=CONCURRENT_TRANSACTIONS, modelapp-ecb-13=CONCURRENT_TRANSACTIONS, modelapp-ecb-14=CONCURRENT_TRANSACTIONS, modelapp-ecb-15=CONCURRENT_TRANSACTIONS, modelapp-ecb-1=CONCURRENT_TRANSACTIONS, modelapp-ecb-2=CONCURRENT_TRANSACTIONS, modelapp-ecb-3=CONCURRENT_TRANSACTIONS, modelapp-ecb-4=CONCURRENT_TRANSACTIONS, modelapp-ecb-5=CONCURRENT_TRANSACTIONS, modelapp-ecb-6=CONCURRENT_TRANSACTIONS, modelapp-ecb-=CONCURRENT_TRANSACTIONS, modelapp-ecb-8=CONCURRENT_TRANSACTIONS}, throttleTimeMs=0) 
for request (type=AddPartitionsToTxnRequest, transactionalId=cma-2, producerId=59003, producerEpoch=0, partitions=[modelapp-ecb-0, modelapp-ecb-9, modelapp-ecb-10, modelapp-ecb-11, modelapp-ecb-12, modelapp-ecb-13, modelapp-ecb-14, modelapp-ecb-15, modelapp-ecb-1, modelapp-ecb-2, modelapp-ecb-3, modelapp-ecb-4, modelapp-ecb-5, modelapp-ecb-6, modelapp-ecb-7, modelapp-ecb-8])

Производитель Kafka повторяет отправку AddPartitionsToTxnRequest (s) несколько раз, пока это не удается, но это приводит к задержкам.

Код выглядит следующим образом:

Properties producerProperties = PropertiesUtil.readPropertyFile(_producerPropertiesFile);
_producer = new KafkaProducer<>(producerProperties);
_producer.initTransactions();

_producerService = Executors.newSingleThreadExecutor(new NamedThreadFactory(getClass().getSimpleName()));
_producerService.submit(() -> {
    while (!Thread.currentThread().isInterrupted()) {

        try {
            _producer.beginTransaction();
            for (int partition = 0; partition < _numberOfPartitions; partition++) 
                _producer.send(new ProducerRecord<>(_producerTopic, partition, KafkaRecordKeyFormatter.formatControlMessageKey(_messageNumber, token), EMPTY_BYTE_ARRAY));

            _producer.commitTransaction();
            _messageNumber++;
            Thread.sleep(_timeBetweenProducedMessagesInMillis);
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException | UnsupportedVersionException e) {
            closeProducer();
            break;
        } catch (KafkaException e) {
            _producer.abortTransaction();
        } catch (InterruptedException e) {...} 
    }
});

Глядя на код брокера, кажется, есть 2 случая, когда эта ошибка выдается, но я не могу сказать, почему я туда попал

object TransactionCoordinator {
...
    def handleAddPartitionsToTransaction(...): Unit = {
    ...
        if (txnMetadata.pendingTransitionInProgress) {
            // return a retriable exception to let the client backoff and retry
            Left(Errors.CONCURRENT_TRANSACTIONS)
        } else if (txnMetadata.state == PrepareCommit || txnMetadata.state == PrepareAbort) {
            Left(Errors.CONCURRENT_TRANSACTIONS)
        }
    ...
    }
...
}

Спасибо заранее за помощью!

Позднее редактирование:

Включив ведение журнала трассировки на брокере, мы смогли увидеть, что брокер отправляет производителю ответ END_TXN до транзакция достигает состояния CompleteCommit. Производитель может начать новую транзакцию, которая отклоняется посредником, пока он еще находится в переходе PrepareCommit -> CompleteCommit.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...