Я хочу сохранить файлы в нескольких местах назначения, используя foreachBatch
, код работает нормально, но foreachBatch
работает не так, как хотелось.Пожалуйста, помогите мне с этим, если у вас есть какие-либо подсказки.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.streaming._
import org.apache.spark.storage.StorageLevel
object multiDestination {
val spark = SparkSession.builder()
.master("local")
.appName("Writing data to multiple destinations")
.getOrCreate()
def main(args: Array[String]): Unit = {
val mySchema = StructType(Array(
StructField("Id", IntegerType),
StructField("Name", StringType)
))
val askDF = spark
.readStream
.format("csv")
.option("header","true")
.schema(mySchema)
.load("/home/amulya/Desktop/csv/")
//println(askDF.show())
println(askDF.isStreaming)
askDF.writeStream.foreachBatch { (askDF : DataFrame , batchId:Long) =>
askDF.persist()
println("Ask")
askDF.write.format("avro").save("/home/amulya/Desktop/md1.avro")
askDF.write.json("/home/amulya/Desktop/md2.json")
askDF.write.parquet("/home/amulya/Desktop/md3.parquet")
askDF.write.csv("/home/amulya/Desktop/md4.csv")
askDF.unpersist()
}.start().awaitTermination()
}
}
Ошибка в компиляторе: - java.lang.IllegalArgumentException: Option 'basePath' must be a directory