Невозможно создать несколько файлов, используя foreachBatch в spark (этот код работает сейчас) - PullRequest
0 голосов
/ 19 сентября 2019

Я хочу сохранить файлы в нескольких местах назначения, используя foreachBatch, код работает нормально, но foreachBatch работает не так, как хотелось.Пожалуйста, помогите мне с этим, если у вас есть какие-либо подсказки.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.streaming._
import org.apache.spark.storage.StorageLevel

object multiDestination {

  val spark = SparkSession.builder()
    .master("local")
    .appName("Writing data to multiple destinations")
    .getOrCreate()

  def main(args: Array[String]): Unit = {

    val mySchema = StructType(Array(
      StructField("Id", IntegerType),
      StructField("Name", StringType)
    ))

    val askDF = spark
      .readStream
      .format("csv")
     .option("header","true")
      .schema(mySchema)
      .load("/home/amulya/Desktop/csv/")
//println(askDF.show())
  println(askDF.isStreaming)

      askDF.writeStream.foreachBatch { (askDF : DataFrame , batchId:Long) =>
          askDF.persist()
          println("Ask")
          askDF.write.format("avro").save("/home/amulya/Desktop/md1.avro")
          askDF.write.json("/home/amulya/Desktop/md2.json")
          askDF.write.parquet("/home/amulya/Desktop/md3.parquet")
          askDF.write.csv("/home/amulya/Desktop/md4.csv")
          askDF.unpersist()
        }.start().awaitTermination()
  }
}

Ошибка в компиляторе: - java.lang.IllegalArgumentException: Option 'basePath' must be a directory

...