Spark: параллельное преобразование нескольких фреймов данных - PullRequest
0 голосов
/ 03 апреля 2019

Понимание того, как добиться наилучшего параллелизма при параллельном преобразовании нескольких фреймов данных

У меня есть массив путей

val paths = Array("path1", "path2", .....

Я загружаю фрейм данных из каждого пути, затем преобразую и записываю в путь назначения

paths.foreach(path => {
  val df = spark.read.parquet(path)
  df.transform(processData).write.parquet(path+"_processed")
})

Преобразование processData не зависит от загружаемого кадра данных.

Это ограничивает обработку одного фрейма данных за раз, и большинство ресурсов моего кластера простаивают. Поскольку обработка каждого кадра данных независима, я преобразовал Array в ParArray из scala.

paths.par.foreach(path => {
  val df = spark.read.parquet(path)
  df.transform(processData).write.parquet(path+"_processed")
})

Теперь он использует больше ресурсов в кластере. Я все еще пытаюсь понять, как это работает и как настроить параллельную обработку здесь

  1. Если я увеличу параллелизм scala по умолчанию с использованием ForkJoinPool до большего числа, может ли это привести к появлению большего количества потоков на стороне драйвера и будет в состоянии блокировки, ожидая завершения функции foreach и в конечном итоге уничтожения драйвера

  2. Как это влияет на такие централизованные искры, как EventLoggingListnener, которые должны обрабатывать больше притоков событий, когда несколько фреймов данных обрабатываются параллельно.

  3. Какие параметры следует учитывать для оптимального использования ресурсов.

  4. Любой другой подход

Любые ресурсы, которые я могу использовать, чтобы понять это масштабирование, были бы очень полезны

1 Ответ

0 голосов
/ 03 апреля 2019

Причина, по которой это происходит медленно, заключается в том, что spark очень хорошо распараллеливает вычисления для большого количества данных, хранящихся в одном большом кадре данных.Тем не менее, он очень плохо справляется с большим количеством информационных фреймов.Он запустит вычисления на одном, используя всех своих исполнителей (даже если они не все нужны), и дождется его завершения, прежде чем начинать следующий.Это приводит к большому количеству неактивных процессоров.Это плохо, но это не то, для чего была разработана искра.

У меня есть хак для вас.Там, возможно, нужно немного уточнить, но у вас есть идея.Вот что я бы сделал.Из списка путей я бы извлек все схемы файлов паркета и создал новую большую схему, которая собирает все столбцы.Затем я попросил бы спарк прочитать все файлы паркета, используя эту схему (отсутствующие столбцы будут автоматически установлены на ноль).Затем я объединил бы все кадры данных и выполнил преобразование на этом большом кадре данных и, наконец, использовал partitionBy, чтобы хранить кадры данных в отдельных файлах, в то же время делая все это параллельно.Это выглядело бы так:

// let create two sample datasets with one column in common (id)
// and two different columns x != y
val d1 = spark.range(3).withColumn("x", 'id * 10)
d1.show
+---+----+
| id|  x |
+---+----+
|  0|   0|
|  1|  10|
|  2|  20|
+---+----+

val d2 = spark.range(2).withColumn("y", 'id cast "string")
d2.show
+---+---+
| id|  y|
+---+---+
|  0|  0|
|  1|  1|
+---+---+

// And I store them
d1.write.parquet("hdfs:///tmp/d1.parquet")
d2.write.parquet("hdfs:///tmp/d2.parquet")
// Now let's create the big schema
val paths = Seq("hdfs:///tmp/d1.parquet", "hdfs:///tmp/d2.parquet")
val fields = paths
    .flatMap(path => spark.read.parquet(path).schema.fields)
    .toSet //removing duplicates
    .toArray
val big_schema = StructType(fields)

// and let's use it
val dfs = paths.map{ path => 
    spark.read
        .schema(big_schema)
        .parquet(path)
        .withColumn("path", lit(path.split("/").last))
}

// The we are ready to create one big dataframe
dfs.reduce( _ unionAll _).show
+---+----+----+----------+
| id|   x|   y|      file|
+---+----+----+----------+
|  1|   1|null|d1.parquet|
|  2|   2|null|d1.parquet|
|  0|   0|null|d1.parquet|
|  0|null|   0|d2.parquet|
|  1|null|   1|d2.parquet|
+---+----+----+----------+

Тем не менее, я не рекомендую использовать unionAll на большом количестве данных.Из-за спарк-анализа плана выполнения он может быть очень медленным со многими фреймами данных.Я бы использовал версию RDD, хотя она более многословна.

val rdds = sc.union(dfs.map(_.rdd))
// let's not forget to add the path to the schema
val big_df = spark.createDataFrame(rdds, 
    big_schema.add(StructField("path", StringType, true)))
transform(big_df)
    .write
    .partitionBy("path")
    .parquet("hdfs:///tmp/processed.parquet")

И, взглянув на мой обработанный каталог, я получаю следующее:

hdfs:///tmp/processed.parquet/_SUCCESS
hdfs:///tmp/processed.parquet/path=d1.parquet
hdfs:///tmp/processed.parquet/path=d2.parquet
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...