Spark File Streaming получает имена файлов - PullRequest
1 голос
/ 13 октября 2019

Мне нужно знать fileName для входного файла, который транслируется из входного каталога.

Ниже приведен код искры FileStreaming в scala-программировании

object FileStreamExample {
  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder.master("local").getOrCreate()

    val input_dir = "src/main/resources/stream_input"
    val ck = "src/main/resources/chkpoint_dir"

    //create stream from folder
    val fileStreamDf = sparkSession.readStream.csv(input_dir)

    def fileNames() = fileStreamDf.inputFiles.foreach(println(_))

    println("Streaming Started...\n")
    //fileNames() //even here it is throwing the same exception
    val query = fileStreamDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("checkpointLocation", ck)
      .start()

    fileNames();

    query.awaitTermination()

  }}

Но перед лицом приведенного ниже исключенияво время потоковой передачи

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/main/resources/stream_input]

1 Ответ

0 голосов
/ 15 октября 2019

Вы можете использовать функцию input_file_name(), определенную в org.apache.spark.sql.functions._, чтобы получить имя файла, из которого строки импортируются в кадр данных.

sparkSession.readStream.csv(input_dir).withColumn("FileName", input_file_name())
...