Я обязан использовать
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
Использование устаревшей функциональности
val kafkaStream = KafkaUtils.createStream(streamingContext, zkArgs, consumerGroupId, topicMap)
kafkaStream.foreachRDD(rdd => {
val sqlContext = new SQLContext(sc)
Я прочитал, что использование водяных знаков вручную делается так:
// enabling watermarking upon success
val sparkConf = new SparkConf()
....
.set("zookeeper.hosts", zkArgs)
.set("enable.auto.commit", "false")
....
df.withWatermark("eventTime", "10 minutes")
.write .....
Следование за классом привело меня к таким классам, как EventTimeWatermark ...
В другом месте я прочитал, что я должен сам написать смещения что-то вроде:
def saveOffsets(zkClient: ZkClient, zkPath: String, rdd: RDD[_]): Unit = {
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}")
.mkString(",")
ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)
}
Есть ли
df.withWatermark("eventTime", "10 minutes")
.write
..... со временем обновить водяной знак в Zookeeper? или в другом механизме на кластере работает искра?