У меня есть несколько файлов паркета (около 1000).Мне нужно загрузить каждый из них, обработать его и сохранить результат в таблицу Hive.У меня есть цикл for, но кажется, что он работает только с 2 или 5 файлами, но не с 1000, поскольку кажется, что Sparks пытается загрузить их все одновременно, и мне нужно, чтобы это делалось по отдельности в одном сеансе Spark.
Я попытался использовать цикл for, затем a для каждого, и я использовал unpersist (), но в любом случае он не работает.
val ids = get_files_IDs()
ids.foreach(id => {
println("Starting file " + id)
var df = load_file(id)
var values_df = calculate_values(df)
values_df.write.mode(SaveMode.Overwrite).saveAsTable("table.values_" + id)
df.unpersist()
})
def get_files_IDs(): List[String] = {
var ids = sqlContext.sql("SELECT CAST(id AS varchar(10)) FROM table.ids WHERE id IS NOT NULL")
var ids_list = ids.select("id").map(r => r.getString(0)).collect().toList
return ids_list
}
def calculate_values(df:org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame ={
val values_id = df.groupBy($"id", $"date", $"hr_time").agg(avg($"value_a") as "avg_val_a", avg($"value_b") as "avg_value_b")
return values_id
}
def load_file(id:String): org.apache.spark.sql.DataFrame = {
val df = sqlContext.read.parquet("/user/hive/wh/table.db/parquet/values_for_" + id + ".parquet")
return df
}
Я ожидаю, что Spark загрузит файл с идентификатором 1, обработает данные, сохранит их в таблицу Hive, а затем отклонит эту дату и продолжит работу со вторым идентификатором и т. Д., Пока он не завершит работу.1000 файлов.Вместо того, чтобы пытаться загрузить все одновременно.
Любая помощь будет принята с благодарностью!Я застрял на нем в течение нескольких дней.Я использую Spark 1.6 с Scala Спасибо !!
РЕДАКТИРОВАТЬ: Добавлены определения.Надеюсь, что это может помочь получить лучшее представление.Спасибо!