Как обрабатывать несколько файлов паркета по отдельности в цикле for? - PullRequest
0 голосов
/ 10 февраля 2019

У меня есть несколько файлов паркета (около 1000).Мне нужно загрузить каждый из них, обработать его и сохранить результат в таблицу Hive.У меня есть цикл for, но кажется, что он работает только с 2 или 5 файлами, но не с 1000, поскольку кажется, что Sparks пытается загрузить их все одновременно, и мне нужно, чтобы это делалось по отдельности в одном сеансе Spark.

Я попытался использовать цикл for, затем a для каждого, и я использовал unpersist (), но в любом случае он не работает.

val ids = get_files_IDs()
ids.foreach(id => {
println("Starting file " + id)
var df = load_file(id)
var values_df = calculate_values(df)
values_df.write.mode(SaveMode.Overwrite).saveAsTable("table.values_" + id)
df.unpersist()
})

def get_files_IDs(): List[String] = {
var ids = sqlContext.sql("SELECT CAST(id AS varchar(10)) FROM  table.ids WHERE id IS NOT NULL")
var ids_list = ids.select("id").map(r => r.getString(0)).collect().toList
return ids_list
}

def calculate_values(df:org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame ={
val values_id = df.groupBy($"id", $"date", $"hr_time").agg(avg($"value_a") as "avg_val_a", avg($"value_b") as "avg_value_b")
return values_id
}

def load_file(id:String): org.apache.spark.sql.DataFrame = {
val df = sqlContext.read.parquet("/user/hive/wh/table.db/parquet/values_for_" + id + ".parquet")
return df
}

Я ожидаю, что Spark загрузит файл с идентификатором 1, обработает данные, сохранит их в таблицу Hive, а затем отклонит эту дату и продолжит работу со вторым идентификатором и т. Д., Пока он не завершит работу.1000 файлов.Вместо того, чтобы пытаться загрузить все одновременно.

Любая помощь будет принята с благодарностью!Я застрял на нем в течение нескольких дней.Я использую Spark 1.6 с Scala Спасибо !!

РЕДАКТИРОВАТЬ: Добавлены определения.Надеюсь, что это может помочь получить лучшее представление.Спасибо!

1 Ответ

0 голосов
/ 13 марта 2019

Хорошо, после долгих проверок я понял, что процесс работает нормально.Он обрабатывал каждый файл индивидуально и сохранял результаты.Проблема заключалась в том, что в некоторых очень специфических случаях процесс шел очень долго.

Так что я могу сказать, что с помощью цикла for или для каждого вы можете обрабатывать несколько файлов и сохранять результаты без проблем.Unpersisting и очистка кеша влияет на производительность.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...