Сохраняет ли Spark Kafka 0,8 непрямой поток KafkaUtils.createStream updateWaterMark сохранение смещений в Zookeeper? - PullRequest
0 голосов
/ 18 марта 2019

Я обязан использовать

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>

Использование устаревшей функциональности

val kafkaStream = KafkaUtils.createStream(streamingContext, zkArgs, consumerGroupId, topicMap)

kafkaStream.foreachRDD(rdd => {

  val sqlContext = new SQLContext(sc)

Я прочитал, что использование водяных знаков вручную делается так:

//      enabling watermarking upon success
val sparkConf = new SparkConf()
  ....
  .set("zookeeper.hosts", zkArgs)
  .set("enable.auto.commit", "false")
  ....

df.withWatermark("eventTime", "10 minutes")
  .write .....

Следование за классом привело меня к таким классам, как EventTimeWatermark ...

В другом месте я прочитал, что я должен сам написать смещения что-то вроде:

def saveOffsets(zkClient:  ZkClient, zkPath: String, rdd: RDD[_]): Unit = {
  val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
  .mkString(",")

  ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
}

Есть ли

df.withWatermark("eventTime", "10 minutes")
      .write

..... со временем обновить водяной знак в Zookeeper? или в другом механизме на кластере работает искра?

1 Ответ

1 голос
/ 18 марта 2019

Поскольку водяные знаки делаются только при потоковой передаче Spark, поздние сообщения, полученные от Kafka, просто игнорируются в Spark.

Смещения Кафки обновляются по мере чтения сообщений.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...