Кафка - Синхронные транзакции - PullRequest
0 голосов
/ 10 апреля 2019

Как зафиксировать транзакции Kafka и сохранить порядок транзакций?

У меня есть транснациональная реализация производитель-потребитель.(читать одно сообщение за раз, затем генерировать одно или несколько сообщений в другой теме).Я хочу заблокировать выполнение моего кода (потребитель приостановки), пока транзакция не будет успешно завершена, а затем прочитать другое сообщение.

Ниже приведено краткое описание кода

// poll here returns one message and execute the block below
consumerRecord = consumer.poll(...) 
{       
   producer.beginTransaction()
   ...
   producer.send(producerRecord).get()
   ...
   producer.sendOffsetsToTransaction(offsets, consumerGroupId)
   producer.commitTransaction()
}

В моем коде:после выполнения producer().send(...).get() блок снова вызывается , поскольку потребитель опрашивает другую запись еще до совершения транзакции.

Как заблокировать при producer.commitTransaction() уровень, а не на producer.send()?

Я использую Kafka v2.2.0

...