Spark Listener выполнить хук на onJobComplete на исполнителях? - PullRequest
0 голосов
/ 16 января 2020

У меня есть простое искровое задание, которое читает данные CSV из S3, преобразует их, разбивает их на части и сохраняет в локальной файловой системе.

У меня есть файл CSV на s3 с содержанием ниже

пример ввода: Япония, 01-01-2020, погода, провайдер, устройство

case class WeatherReport(country:String, date:String, event:String, provide:String, device:String )

object SampleSpark extends App{

     val conf = new SparkConf()
      .setAppName("processing")
      .setIfMissing("spark.master", "local[*]")
      .setIfMissing("spark.driver.host", "localhost")

     val sc = new SparkContext(conf)

     val baseRdd = sc.textFile("s3a://mybucket/sample/*.csv")

     val weatherDataFrame = baseRdd
     .filter(_.trim.nonEmpty)
     .map(x => WeatherReport(x))
     .toDF()

     df.write.partitionBy("date")
      .mode(SaveMode.Append)
      .format("com.databricks.spark.csv")
      .save("outputDirectory")
}

Файл сохраняется в "outputDirectory / date = 01-01-2020 / part-" с более чем 1 частью файлы. Я хочу объединить файл детали и удалить префикс date= name, например "outputDirectory / 01-01-2020 / output.csv", и скопировать его в S3.

Как это можно сделать ??

Я думал об использовании SparkListener, как показано ниже, но я думаю, что он будет работать только на диске, но файлы будут присутствовать на Executors.

sparkContext.addListener(new SparkListener {
      override def onJobEnd(jobEnd: SparkListenerJobEnd) {
        renameDirectory()
        mergePartFilesToSingleFiles()
        uploadFileToS3()
      }
})

Есть ли способ запустить пост завершения работы подключить Executors и Driver, которые синхронизируют c все локальные файлы на них с S3?

...