При тестировании производственного сценария использования я создал и сохранил (используя Hive Metastore) такие таблицы:
table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets
table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets
Я выполняю такой запрос (в псевдокоде)
table1.join(table2, [“key1”, “key2”])
.groupBy(“value2”)
.countUnique(“key1”)
Здравый смысл говорит, что это объединение должно быть просто выполнено с объединением сортировки-слияния без обмена;однако spark выполняет обмен, а затем присоединяется.
Несмотря на то, что для этого конкретного варианта использования я мог бы использовать оба ключа из-за некоторых других сценариев использования, которые мне нужно объединить в ключ key1.И когда я выполняю (более простое) объединение, используя один ключ, подобный следующему:
table1.join(table2, [“key1”])
Это работает как ожидалось (т.е. объединение с сортировкой-слиянием без обмена).
Теперь, когда у меня есть оптимизированное соединение для этих таблиц, если я хочу отфильтровать его следующим образом:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
Он возвращается к обмену, а затем присоединяется.
Как я могу убедить спарк не производить обмен, если ключ соединения является супер-набором ключа bucketBy?
Примечание:
Один известный мне трюк - вместо проверки на равенство, если япереписал бы как проверки неравенства, искра не перемешалась бы.
(x == y) также может быть выражено как ((x> = y) & (x <= y)).Если бы я применил два фильтра, как это в последнем примере: </p>
.filter (table1.col («key2»)>> = table2.col («key2»))
.filter (table1.col («key2») <= table2.col («key2»)) </p>
Он продолжит использовать соединение сортировки-слияния без обмена, однако это не решение,это хак.