Если честно, я протестировал его с нормальной кафкой без схемы транзакции, и он не пропускает смещение, когда я пытаюсь повторно запустить ProducerTest много раз.
object ProducerTest extends LazyLogging {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka.local:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("acks", "all")
props.put("retries", "3")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("zxc", "key", "value")
val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
producer.send(record)
producer.send(record2)
producer.send(record3)
Thread.sleep(3000)
}
}
Но когда я включить транзакцию для производителя, она пропустит одно смещение при повторном запуске приложения ProducerTestWithTransaction. Как и в первый раз, он имеет смещение 0,1,2, затем после повторного запуска будет 4,5,6, что пропускает 3 и т. Д. И т. Д.
object ProducerTestWithTransaction extends LazyLogging {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka.local:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("enable.idempotence", "true")
props.put("transactional.id", "alona")
props.put("acks", "all")
props.put("retries", "3")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("wew", "key", "value")
val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
producer.initTransactions()
try {
producer.beginTransaction()
producer.send(record)
producer.send(record2)
producer.send(record3)
producer.commitTransaction()
} catch {
case e: ProducerFencedException => producer.close()
case e: Exception => producer.abortTransaction();
}
}
}
Пожалуйста, просветите меня, почему это происходит? Есть ли обходной путь, чтобы не пропустить смещение. Спасибо! Примечание: я использую kafka-клиентов 2.4.0 и wurstmeister / kafka: 2.12-2.3.0