Как я могу убедить спарк не производить обмен, если ключ соединения является супер-набором ключа bucketBy? - PullRequest
2 голосов
/ 26 июня 2019

При тестировании производственного сценария использования я создал и сохранил (используя Hive Metastore) такие таблицы:

table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets

table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets

Я выполняю такой запрос (в псевдокоде)

table1.join(table2, [“key1”, “key2”])
 .groupBy(“value2”)
 .countUnique(“key1”)

Здравый смысл говорит, что это объединение должно быть просто выполнено с объединением сортировки-слияния без обмена;однако spark выполняет обмен, а затем присоединяется.

Несмотря на то, что для этого конкретного варианта использования я мог бы использовать оба ключа из-за некоторых других сценариев использования, которые мне нужно объединить в ключ key1.И когда я выполняю (более простое) объединение, используя один ключ, подобный следующему:

table1.join(table2, [“key1”])

Это работает как ожидалось (т.е. объединение с сортировкой-слиянием без обмена).

Теперь, когда у меня есть оптимизированное соединение для этих таблиц, если я хочу отфильтровать его следующим образом:

table1.join(table2, [“key1”])
 .filter(table1.col(“key2”) == table2.col(“key2”))

Он возвращается к обмену, а затем присоединяется.

Как я могу убедить спарк не производить обмен, если ключ соединения является супер-набором ключа bucketBy?

Примечание:

Один известный мне трюк - вместо проверки на равенство, если япереписал бы как проверки неравенства, искра не перемешалась бы.

(x == y) также может быть выражено как ((x> = y) & (x <= y)).Если бы я применил два фильтра, как это в последнем примере: </p>

.filter (table1.col («key2»)>> = table2.col («key2»))

.filter (table1.col («key2») <= table2.col («key2»)) </p>

Он продолжит использовать соединение сортировки-слияния без обмена, однако это не решение,это хак.

Ответы [ 3 ]

1 голос
/ 23 июля 2019

Судя по некоторым исследованиям, это наименее хакерское решение:

Опираясь на этот пример:

table1.join(table2, [“key1”])
      .filter(table1.col(“key2”) == table2.col(“key2”))

Вместо использования equalTo (==) от Spark, реализация пользовательской MyEqualTo (делегирование реализации spark EqualTo в порядке), похоже, решает проблему. Таким образом, spark не оптимизирует (!) Соединение, а просто вытянет фильтр в SortMergeJoin.

Аналогично, условие соединения также может быть сформировано так:

(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
0 голосов
/ 25 июля 2019

** на основе вашего псевдокода **

table1.join (table2, [«key1», «key2»]) .groupBy («value2») .countUnique («key1»)

Я думаю, что решение будет

, в качестве первого шага просто объедините таблицы и получите фрейм данных.

df = table1.join(table2, [“key1”, “key2”])

, затем сгруппируйте пои делать разные подсчеты

df.select(“value2”,“key1”).distinct().groupBy(“value2”,“key1”).count().show()
0 голосов
/ 24 июля 2019

org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin - это правило оптимизатора, которое проталкивает предикат через соединение.~~
Мы можем исключить это правило из правил оптимизатора.Таким образом, нам не нужно вносить какие-либо изменения в код пользователя.
Чтобы исключить, мы можем сделать одно из следующих действий:
1. --conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin.
2. добавить свойство в spark-defaults.conf.
3. добавьте set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin к коду пользователя.

Опять же, это снова взлом. .
В идеале, фильтры должны быть пропущены через соединение, что уменьшает количество строк, которые нужно объединить

Обновление: .
1. Я ошибся насчет нажатия. не будет нажатия на фильтр , так как предикат содержит столбцы из обеих таблиц.
2. Почему SortMergeJoin (SMJ) не добавляет дополнительные обмены, когда предложение , где имеетпредикат "неравенства"?
Это потому, что SMJ может рассматривать только предикаты на основе равенства как часть условия соединения org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys # unapply

И EnsureRequirements, ответственный за добавление обмена, видит, что SMJ не имеет нового условия соединения и что распределение вывода уже выполнено.
код: org.apache.spark.sql.execution.exchange.EnsureRequirements #ureDistributionAndOrdering .
3. Что эффективно - добавление UDF, который выполняет равно или представляет предикат как комбинацию больше или меньше? .
Чтобы оценить это,Я проверил сгенерированный код, используя,

val df = spark.sql(<joinquery>)
df.queryExecution.debug.codegen

a.UDF равно - включает дополнительные накладные расходы на вызовы виртуальных функций.
b.комбинация «меньше» и «больше -» - никаких вызовов виртуальных функций.Как только мы находим объединенную строку (используя key1), код проверяет другие предикаты один за другим.

Из приведенных выше наблюдений в 3 использование предиката, основанного на неравенстве, кажется более эффективным.

...