У меня есть этот код, и он выдает ошибку, basepath должен быть dir. Просто хочу запустить простую потоковую кафку Мойку.
val checkPointDir = "/tmp/offsets/" // "hdfs://hdfscluster/user/yarn/tmp/"
def main(args: Array[String]): Unit ={
lazy val spark = SparkSession
.builder
.appName("KafkaProducer")
.master("local[*]")
.getOrCreate()
val query = writeStream(jsonDF, "test")
query.awaitTermination()
}
def writeStream(df:DataFrame, topic:String): StreamingQuery = {
// log.warn("Writing to kafka")
df
// .selectExpr( "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("topic", topic)
.option("checkpointLocation", checkPointDir)
.outputMode(OutputMode.Update)
.start()
}
Мой пользователь является владельцем этой папки / tmp / offsets. Я получаю это исключение.
java.lang.IllegalArgumentException: опция 'basePath' должна быть каталогом