Как зафиксировать транзакции Kafka и сохранить порядок транзакций?
У меня есть транснациональная реализация производитель-потребитель.(читать одно сообщение за раз, затем генерировать одно или несколько сообщений в другой теме).Я хочу заблокировать выполнение моего кода (потребитель приостановки), пока транзакция не будет успешно завершена, а затем прочитать другое сообщение.
Ниже приведено краткое описание кода
// poll here returns one message and execute the block below
consumerRecord = consumer.poll(...)
{
producer.beginTransaction()
...
producer.send(producerRecord).get()
...
producer.sendOffsetsToTransaction(offsets, consumerGroupId)
producer.commitTransaction()
}
В моем коде:после выполнения producer().send(...).get()
блок снова вызывается , поскольку потребитель опрашивает другую запись еще до совершения транзакции.
Как заблокировать при producer.commitTransaction()
уровень, а не на producer.send()
?
Я использую Kafka v2.2.0