У меня есть простое искровое задание, которое читает данные CSV из S3, преобразует их, разбивает их на части и сохраняет в локальной файловой системе.
У меня есть файл CSV на s3 с содержанием ниже
пример ввода: Япония, 01-01-2020, погода, провайдер, устройство
case class WeatherReport(country:String, date:String, event:String, provide:String, device:String )
object SampleSpark extends App{
val conf = new SparkConf()
.setAppName("processing")
.setIfMissing("spark.master", "local[*]")
.setIfMissing("spark.driver.host", "localhost")
val sc = new SparkContext(conf)
val baseRdd = sc.textFile("s3a://mybucket/sample/*.csv")
val weatherDataFrame = baseRdd
.filter(_.trim.nonEmpty)
.map(x => WeatherReport(x))
.toDF()
df.write.partitionBy("date")
.mode(SaveMode.Append)
.format("com.databricks.spark.csv")
.save("outputDirectory")
}
Файл сохраняется в "outputDirectory / date = 01-01-2020 / part-" с более чем 1 частью файлы. Я хочу объединить файл детали и удалить префикс date=
name, например "outputDirectory / 01-01-2020 / output.csv", и скопировать его в S3.
Как это можно сделать ??
Я думал об использовании SparkListener, как показано ниже, но я думаю, что он будет работать только на диске, но файлы будут присутствовать на Executors.
sparkContext.addListener(new SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
renameDirectory()
mergePartFilesToSingleFiles()
uploadFileToS3()
}
})
Есть ли способ запустить пост завершения работы подключить Executors и Driver, которые синхронизируют c все локальные файлы на них с S3?