Сортировка таблиц улья после объединения в искру - PullRequest
1 голос
/ 05 апреля 2019

Я запускаю искровую оболочку с искрой 2.3.1 со следующими параметрами:

  • --master='local[*]'
  • --executor-memory=6400M
  • --driver-memory=60G
  • --conf spark.sql.autoBroadcastJoinThreshold=209715200
  • --conf spark.sql.shuffle.partitions=1000
  • --conf spark.local.dir=/data/spark-temp
  • --conf spark.driver.extraJavaOptions='-Dderby.system.home=/data/spark-catalog/'

Затем создайтедве таблицы улья с сортировкой и корзинами

Имя первой таблицы - table1

Имя второй таблицы - table2

val storagePath = "path_to_orc"
val storage = spark.read.orc(storagePath)
val tableName = "table1"

sql(s"DROP TABLE IF EXISTS $tableName")
storage.select($"group", $"id").write.bucketBy(bucketsCount, "id").sortBy("id").saveAsTable(tableName)

(тот же код для table2)

Я ожидал, что когда я присоединяюсь к любой из этих таблиц с другим df, в плане запроса не будет ненужного шага Exchange

Тогда яотключить трансляцию, чтобы использовать SortMergeJoin

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

Я беру немного df

val sample = spark.read.option("header", "true).option("delimiter", "\t").csv("path_to_tsv")

val m = spark.table("table1")
sample.select($"col" as "id").join(m, Seq("id")).explain()

== Physical Plan ==
*(4) Project [id#24, group#0]
+- *(4) SortMergeJoin [id#24], [id#1], Inner
   :- *(2) Sort [id#24 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#24, 1000)
   :     +- *(1) Project [col#21 AS id#24]
   :        +- *(1) Filter isnotnull(col#21)
   :           +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
   +- *(3) Project [group#0, id#1]
      +- *(3) Filter isnotnull(id#1)
         +- *(3) FileScan parquet default.table1[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>

Но когда я использую объединение для двух таблиц перед объединением

val m2 = spark.table("table2")
val mUnion = m union m2
sample.select($"col" as "id").join(mUnion, Seq("id")).explain()

== Physical Plan ==
*(6) Project [id#33, group#0]
+- *(6) SortMergeJoin [id#33], [id#1], Inner
   :- *(2) Sort [id#33 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#33, 1000)
   :     +- *(1) Project [col#21 AS id#33]
   :        +- *(1) Filter isnotnull(col#21)
   :           +- *(1) FileScan csv [col#21] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/samples/sample-20K], PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:string>
   +- *(5) Sort [id#1 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#1, 1000)
         +- Union
            :- *(3) Project [group#0, id#1]
            :  +- *(3) Filter isnotnull(id#1)
            :     +- *(3) FileScan parquet default.membership_g043_append[group#0,id#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>
            +- *(4) Project [group#4, id#5]
               +- *(4) Filter isnotnull(id#5)
                  +- *(4) FileScan parquet default.membership_g042[group#4,id#5] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/data/table2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<group:string,id:string>

В этом случаепоявилась сортировка и разбиение (шаг 5)

Как объединить две таблицы улья без сортировки и обмена

1 Ответ

1 голос
/ 05 апреля 2019

Насколько я знаю, spark не рассматривает сортировку при объединении, а только разделы. Таким образом, чтобы получить эффективные объединения, вы должны разделить по одному столбцу. Это связано с тем, что сортировка не гарантирует, что записи с одинаковым ключом попадут в один и тот же раздел. Spark должен убедиться, что все ключи с одинаковыми значениями перетасованы в один и тот же раздел и на одного исполнителя из нескольких фреймов данных.

...