Эффективный способ объединения нескольких таблиц в Spark - на устройстве не осталось места - PullRequest
3 голосов
/ 14 марта 2019

Подобный вопрос был задан здесь , но он не решает мой вопрос должным образом.У меня есть около 100 DataFrames, в каждом из которых по крайней мере 200,000 строк, и мне нужно присоединиться к ним, выполнив full объединение на основе столбца ID, создавая тем самым DataFrame со столбцами - ID, Col1, Col2,Col3,Col4, Col5..., Col102.

Просто для иллюстрации, структура моих DataFrames -

df1 =                          df2 =            df3 =          .....  df100 = 
+----+------+------+------+    +----+------+    +----+------+         +----+------+ 
|  ID|  Col1|  Col2|  Col3|    |  ID|  Col4|    |  ID|  Col5|         |  ID|Col102|
+----+------+-------------+    +----+------+    +----+------+         +----+------+
| 501|  25.1|  34.9| 436.9|    | 501| 22.33|    | 503| 22.33|         | 501|  78,1|
| 502|  12.2|3225.9|  46.2|    | 502| 645.1|    | 505| 645.1|         | 502|  54.9|
| 504| 754.5| 131.0| 667.3|    | 504| 547.2|    | 504| 547.2|         | 507|     0|
| 505|324.12| 48.93|  -1.3|    | 506|     2|    | 506|     2|         | 509| 71.57|
| 506| 27.51| 88.99|  67.7|    | 507| 463.7|    | 507| 463.7|         | 510|  82.1|
.
.
+----+------+------|------|    |----|------|    |----|------|         |----|------|

Я начал объединять эти DataFrames, последовательно full соединяя их все.Естественно, это сложная вычислительная процедура, и нужно стремиться уменьшить количество shuffles на разных рабочих узлах.Поэтому я начал с разделения DataFrame df1 на основе ID с использованием repartition () , который hash-partitions DataFrame на основе ID на 30 разделов -

df1 = df1.repartition(30,'ID')

Теперь я делаю full соединение между df1 и df2.

df = df1.join(df2,['ID'],how='full')
df.persist()

Поскольку df1 уже было hash-partitioned, поэтому я ожидал, что это join выше будет пропущеноперетасовывает и сохраняет partitioner из df1, но я замечаю, что shuffle имел место, и это увеличивало количество разделов на df до 200.Теперь, если я продолжу присоединяться к последующим фреймам данных, вызывая их с помощью функции, показанной ниже, я получаю ошибку java.io.IOException: No space left on device -

def rev(df,num):
     df_temp = spark.read.load(filename+str(num)+'.csv')
     df_temp.persist()
     df = df.join(df_temp,['ID'],how='full')
     df_temp.unpersist()
     return df

df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() - 
print("Total number of rows: "+str(df.count()))
df.unpersist()  # Never reached this stage.

Обновление: Сообщение об ошибке -

Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)

Вопросы: 1. Почему разделитель df1 не был сохранен, когда мы сделали первый join?

2.Как я могу присоединиться к этим несколькимтаблицы, а также избежать этой No space left on device проблемы?Пользователь @silvio здесь предлагает использовать .bucketBy () , но он также сослался на факт сохранения разделителя, чего не произошло.Поэтому я не уверен в том, какой эффективный способ объединить эти несколько фреймов данных.

Будем очень благодарны за любые предложения / подсказки.

Ответы [ 2 ]

2 голосов
/ 15 марта 2019

1-я попытка сохранить ваш большой df каждые N итераций с циклом for (что вы, вероятно, уже сделали)

2-я попытка контролировать номер раздела по умолчанию, установив sqlContext.sql("set spark.sql.shuffle.partitions=100") вместо 200, что по умолчанию.

Ваш код должен выглядеть следующим образом:

num_partitions = 10
big_df = spark.createDataFrame(...) #empty df
for i in range(num_partitions):
   big_df = big_df.join(df, ....)

   if i % num_partitions == 0:
     big_df = big_df.persist()

Здесь я призываю сохраняться каждые 10 итераций, вы, конечно, можете настроить это число в соответствии с поведением вашей работы.

EDIT: В вашем случае вы сохраняете локальный df_temp внутри функции rev, но не весь фрейм данных, который содержит все предыдущие объединения (df в вашем случае).Это не будет иметь никакого эффекта в окончательном плане выполнения, поскольку это локальное сохранение.Что касается моего предложения, давайте предположим, что вам нужно в общей сложности 100 объединений, тогда с кодом выше вы должны выполнить итерацию цикла [1..100] и сохранять накопленные результаты каждые 10 итераций.После сохранения большого фрейма данных DAG будет содержать меньше вычислений в памяти, поскольку промежуточные шаги будут сохранены, и Spark знает, как восстановить их из хранилища, а не пересчитывать все с нуля.

1 голос
/ 14 марта 2019

В прошлом у меня была похожая проблема, за исключением того, что у меня не было столько RDD. Самым эффективным решением, которое я смог найти, было использование низкоуровневого API RDD. Сначала сохраните все RDD так, чтобы они были (хэш) разделены и отсортированы в разделах по столбцам столбцов соединения: https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/OrderedRDDFunctions.html#repartitionAndSortWithinPartitions-org.apache.spark.Partitioner-

После этого соединение может быть реализовано с использованием zip-разделов без перестановки или большого объема памяти: https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/RDD.html#zipPartitions-org.apache.spark.rdd.RDD-boolean-scala.Function2-scala.reflect.ClassTag-scala.reflect.ClassTag-

...