Spark присоединиться * без * перемешивания - PullRequest
0 голосов
/ 08 декабря 2018

Я пытаюсь оптимизировать работу приложения Spark.

Я пытался понять смысл этого вопроса: Как избежать перемешивания при объединении DataFrames с уникальными ключами?

  1. Я убедился, что ключи, на которых должна выполняться операция объединения, распределены в пределах одного и того же раздела (с помощью моего пользовательского разделителя).

  2. Я также не могусделайте широковещательное соединение, потому что мои данные могут быть большими в зависимости от ситуации.

  3. В ответе на вышеупомянутый вопрос перераспределение только оптимизирует объединение, но мне нужно соединение без БЕЗ ТОРГОВ.Я просто в порядке с операцией соединения с помощью ключей в разделе.

Возможно ли это?Я хочу реализовать что-то вроде joinperpartition, если подобная функциональность не существует.

Ответы [ 2 ]

0 голосов
/ 10 декабря 2018

просто дополнение к ранее хорошим ответам.Если вы несколько раз объединяете большой фрейм данных в своем приложении pyspark, сохраните эту таблицу в виде таблиц с пакетами и прочитайте их обратно в pyspark как фрейм данных.таким образом, вы можете избежать нескольких перемешиваний во время объединения, так как данные уже предварительно перемешаны и отсортированы.

, поэтому, когда Spark выбирает сортировку слиянием и объединением на двух больших фреймах данных, он пропускает фазу сортировки и перемешивания во время операций объединения.(Вы можете подтвердить это в пользовательском интерфейсе spark, глядя на wholecodegen)

df_data_1.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table1')

df_data_2.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table2')

df_bucket_table_1 = spark.table("bucketed_table1");
df_bucket_table_2 = spark.table("bucketed_table2");

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.join.preferSortMergeJoin","true")

 #creating alias for the dataframes:
from pyspark.sql.functions import *

df1 = df_bucket_table_1.alias('df1')
df2 = df_bucket_table_2.alias('df2')


DfInnerJoin = df1.join(df2, df1.joincolumn == df2.joincolumn,'inner').select('df1.*')

Приведенное выше объединение не будет тасоваться, но это полезно только в том случае, если вам необходимо присоединиться к одному и тому же фрейму данных по всему приложению несколько раз.

0 голосов
/ 08 декабря 2018

перераспределение только оптимизирует объединение, но мне нужно только объединение без перестановки

Это не так.Передел не только «оптимизирует» объединение.Передел связывает Partitioner с вашим RDD, который является ключевым компонентом для соединения на стороне карты.

Я удостоверился, что ключи, на которых должна выполняться операция объединения, распределены в одном и том же разделе

Spark должен знать об этом.Создайте свои DataFrames с соответствующими API-интерфейсами, чтобы они имели одинаковый Partitioner, и остальное позаботится об остальном.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...