Я использовал Bucketing при объединении двух таблиц, но обмен все еще происходит. Я не уверен, что я делаю неправильно.
Я прошел - https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html и другие блоги, в которых рассказывается о ведении.
Я подтвердил ниже вещи -
- Количество разделов на обеих сторонах объединения должно быть одинаковым.
bucketedTable1.queryExecution.toRdd.getNumPartitions
bucketedTable1.queryExecution.toRdd.getNumPartitions
Оба приведенных выше запроса приводят к 1000.
Используется схема разбиения HashPartitioning.
Я присоединяюсь к таблице на основе тех же ключей, которые использовались для группирования.
Я проверил, что ведение включено.
spark.sessionState.conf.bucketingEnabled привел к значению true.
val table1 = sql("select * from table1")
val table2 = sql("select * from table2")
table1.write.bucketBy(1000, "col1", "col2", "col3", "col4").mode(SaveMode.Overwrite).saveAsTable("table1BucketedTableName")
table2.write.bucketBy(1000, "col5", "col6", "col3", "col4").mode(SaveMode.Overwrite).saveAsTable("table2BucketedTableName")
val bucketedTable1 = spark.table("table1BucketedTableName")
val bucketedTable2 = spark.table("table2BucketedTableName")
val joinedTable = bucketedTable1.join(bucketedTable2, bucketedTable1("col1") === bucketedTable2("col5") && bucketedTable1("col2") === bucketedTable2("col6") && bucketedTable1("col3") === bucketedTable2("col3") && bucketedTable1("col4") === bucketedTable2("col4"))
joinedTable.show(false)
Ожидаемый результат - обмен не должен происходить.
Но когда я проверяю интерфейс Spark, я вижу, как происходит обмен.