да, можно пропустить # 2. Запись в то же место может быть сделано с SaveMode.Overwrite
в том же месте, откуда вы читаете.
при первом чтении json, т. Е. № 1 в качестве информационного кадра, оно будет в памяти, если вы выполните кеширование. После этого вы можете выполнить очистку и объединить все json в единое целое с объединением и сохранить в файле паркета за один шаг. что-то вроде этого примера.
Случай 1: все jsons находятся в разных папках, и вы хотите, чтобы они сохраняли окончательный кадр данных как паркет в том же месте, где есть jsons ...
val dfpath1 = spark.read.json("path1")
val dfpath2 = spark.read.json("path2")
val dfpath3 = spark.read.json("path3")
val df1 = cleanup1 function dfpath1 returns dataframe
val df2 = cleanup2 function dfpath2 returns dataframe
val df3 = cleanup3 function dfpath3 returns dataframe
val dfs = Seq(df1, df2, df3)
val finaldf = dfs.reduce(_ union _) // you should have same schema while doing union..
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with samelocations json.parquet")
Случай 2 : все файлы jsons находятся в одних и тех же папках, и вы хотите, чтобы они сохраняли окончательный кадр данных в виде нескольких паркетов в одном root месте, где есть jsons ...
В этом случае нет необходимости читать как несколько фреймов данных, вы можете указать root путь, где есть jsons с той же схемой
val dfpath1 = spark.read.json("rootpathofyourjsons with same schema")
// or you can give multiple paths spark.read.json("path1","path2","path3")
// since it s supported by spark dataframe reader like this ...def json(paths: String*):
val finaldf = cleanup1 function returns dataframe
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with sameroot locations json.parquet")
AFAIK, в любом случае aws s3 sdk api is больше не требуется.
import org.apache.spark.sql.functions._
val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")
df.show(false)
df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
val df1 = spark.read.format("parquet").load(".../temp") // read back again
val df2 = df1.withColumn("cleanup" , lit("Quick silver want to cleanup")) // like you said you want to clean it.
//BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.
df2.cache // cache to avoid FileNotFoundException
df2.show(2, false) // light action to avoid FileNotFoundException
// or println(df2.count) // action
df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
println("quick silver saved in same directory where he read it from final records he saved after clean up are ")
df2.show(false)
Результат:
+---+----+
|sex|date|
+---+----+
|1 |10 |
|2 |20 |
|3 |30 |
+---+----+
+---+----+----------------------------+
|sex|date|cleanup |
+---+----+----------------------------+
|1 |10 |Quick silver want to cleanup|
|2 |20 |Quick silver want to cleanup|
+---+----+----------------------------+
only showing top 2 rows
quick silver saved in same directory where he read it from final records he saved after clean up are
+---+----+----------------------------+
|sex|date|cleanup |
+---+----+----------------------------+
|1 |10 |Quick silver want to cleanup|
|2 |20 |Quick silver want to cleanup|
|3 |30 |Quick silver want to cleanup|
+---+----+----------------------------+
Снимок экрана: файл сохранен, очистка при повторном чтении очищена и снова сохранена:
Примечание. Необходимо реализовать кейс 1 или кейс 2 , как предложенное обновление выше ...