Транзакция Kafka - пропустить одно смещение, когда приложение останавливается и запускается снова - PullRequest
0 голосов
/ 13 февраля 2020

Если честно, я протестировал его с нормальной кафкой без схемы транзакции, и он не пропускает смещение, когда я пытаюсь повторно запустить ProducerTest много раз.

object ProducerTest extends LazyLogging {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka.local:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("acks", "all")
    props.put("retries", "3")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("zxc", "key", "value")
    val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
    val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
    producer.send(record)
    producer.send(record2)
    producer.send(record3)
    Thread.sleep(3000)
  }
}

Но когда я включить транзакцию для производителя, она пропустит одно смещение при повторном запуске приложения ProducerTestWithTransaction. Как и в первый раз, он имеет смещение 0,1,2, затем после повторного запуска будет 4,5,6, что пропускает 3 и т. Д. И т. Д.

object ProducerTestWithTransaction extends LazyLogging {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka.local:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("enable.idempotence", "true")
    props.put("transactional.id", "alona")
    props.put("acks", "all")
    props.put("retries", "3")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("wew", "key", "value")
    val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
    val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
    producer.initTransactions()
    try {
      producer.beginTransaction()
      producer.send(record)
      producer.send(record2)
      producer.send(record3)
      producer.commitTransaction()
    } catch {
      case e: ProducerFencedException => producer.close()
      case e: Exception => producer.abortTransaction();
    }
  }
}

Пожалуйста, просветите меня, почему это происходит? Есть ли обходной путь, чтобы не пропустить смещение. Спасибо! Примечание: я использую kafka-клиентов 2.4.0 и wurstmeister / kafka: 2.12-2.3.0

1 Ответ

0 голосов
/ 13 февраля 2020

Транзакции вставляют события маркера, AFAIK, которые могут быть тем, что вы видите

Однако в обоих кодах producer.flush() должен вызываться в ловушке завершения работы

...