Сохранить смещение сообщения в Kafka с помощью KafkaUtils.createDirectStream - PullRequest
0 голосов
/ 14 января 2019

Как сохранить смещение сообщения в Kafka, если я использую KafkaUtils.createDirectStream для чтения сообщений. Кафка теряет значение смещения каждый раз, когда приложение выходит из строя. Затем оно читает значение, предоставленное в auto.offset.reset (которое является последним), и не может прочитать сообщения в интервале остановки и запуска приложения.

1 Ответ

0 голосов
/ 14 января 2019

Этого можно избежать, зафиксировав смещение вручную. Задайте для enable.auto.commit значение false, а затем используйте приведенный ниже код для фиксации смещения в kafka после успешной операции.

  var offsetRanges = Array[OffsetRange]()

          val valueStream = stream.transform {
            rdd =>
              offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
              rdd
          }.map(_.value())
//operation
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

Вы также можете прочитать этот документ, который даст вам хорошее понимание управления смещением https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

...