Причина, по которой это происходит медленно, заключается в том, что spark очень хорошо распараллеливает вычисления для большого количества данных, хранящихся в одном большом кадре данных.Тем не менее, он очень плохо справляется с большим количеством информационных фреймов.Он запустит вычисления на одном, используя всех своих исполнителей (даже если они не все нужны), и дождется его завершения, прежде чем начинать следующий.Это приводит к большому количеству неактивных процессоров.Это плохо, но это не то, для чего была разработана искра.
У меня есть хак для вас.Там, возможно, нужно немного уточнить, но у вас есть идея.Вот что я бы сделал.Из списка путей я бы извлек все схемы файлов паркета и создал новую большую схему, которая собирает все столбцы.Затем я попросил бы спарк прочитать все файлы паркета, используя эту схему (отсутствующие столбцы будут автоматически установлены на ноль).Затем я объединил бы все кадры данных и выполнил преобразование на этом большом кадре данных и, наконец, использовал partitionBy
, чтобы хранить кадры данных в отдельных файлах, в то же время делая все это параллельно.Это выглядело бы так:
// let create two sample datasets with one column in common (id)
// and two different columns x != y
val d1 = spark.range(3).withColumn("x", 'id * 10)
d1.show
+---+----+
| id| x |
+---+----+
| 0| 0|
| 1| 10|
| 2| 20|
+---+----+
val d2 = spark.range(2).withColumn("y", 'id cast "string")
d2.show
+---+---+
| id| y|
+---+---+
| 0| 0|
| 1| 1|
+---+---+
// And I store them
d1.write.parquet("hdfs:///tmp/d1.parquet")
d2.write.parquet("hdfs:///tmp/d2.parquet")
// Now let's create the big schema
val paths = Seq("hdfs:///tmp/d1.parquet", "hdfs:///tmp/d2.parquet")
val fields = paths
.flatMap(path => spark.read.parquet(path).schema.fields)
.toSet //removing duplicates
.toArray
val big_schema = StructType(fields)
// and let's use it
val dfs = paths.map{ path =>
spark.read
.schema(big_schema)
.parquet(path)
.withColumn("path", lit(path.split("/").last))
}
// The we are ready to create one big dataframe
dfs.reduce( _ unionAll _).show
+---+----+----+----------+
| id| x| y| file|
+---+----+----+----------+
| 1| 1|null|d1.parquet|
| 2| 2|null|d1.parquet|
| 0| 0|null|d1.parquet|
| 0|null| 0|d2.parquet|
| 1|null| 1|d2.parquet|
+---+----+----+----------+
Тем не менее, я не рекомендую использовать unionAll
на большом количестве данных.Из-за спарк-анализа плана выполнения он может быть очень медленным со многими фреймами данных.Я бы использовал версию RDD, хотя она более многословна.
val rdds = sc.union(dfs.map(_.rdd))
// let's not forget to add the path to the schema
val big_df = spark.createDataFrame(rdds,
big_schema.add(StructField("path", StringType, true)))
transform(big_df)
.write
.partitionBy("path")
.parquet("hdfs:///tmp/processed.parquet")
И, взглянув на мой обработанный каталог, я получаю следующее:
hdfs:///tmp/processed.parquet/_SUCCESS
hdfs:///tmp/processed.parquet/path=d1.parquet
hdfs:///tmp/processed.parquet/path=d2.parquet