Spark сортирует уже отсортированные разделы, что приводит к снижению производительности - PullRequest
0 голосов
/ 11 июля 2019

Для кэшированного фрейма данных, разбитого на разделы и отсортированного по разделам, я получаю хорошую производительность при запросе ключа с предложением where, но низкую производительность при выполнении соединения с небольшой таблицей на том же ключе.

См. Пример набора данных dftest ниже с 10Kx44K = 438M строк.

sqlContext.sql(f'set spark.sql.shuffle.partitions={32}')
sqlContext.clearCache()
sc.setCheckpointDir('/checkpoint/temp')
import datetime
from pyspark.sql.functions import *
from pyspark.sql import Row

start_date = datetime.date(1900, 1, 1)
end_date   = datetime.date(2020, 1, 1)

dates = [ start_date + datetime.timedelta(n) for n in range(int ((end_date - start_date).days))]

dfdates=spark.createDataFrame(list(map(lambda x: Row(date=x), dates))) # some dates
dfrange=spark.createDataFrame(list(map(lambda x: Row(number=x), range(10000)))) # some number range

dfjoin = dfrange.crossJoin(dfdates)
dftest = dfjoin.withColumn("random1", round(rand()*(10-5)+5,0)).withColumn("random2", round(rand()*(10-5)+5,0)).withColumn("random3", round(rand()*(10-5)+5,0)).withColumn("random4", round(rand()*(10-5)+5,0)).withColumn("random5", round(rand()*(10-5)+5,0)).checkpoint()
dftest = dftest.repartition("number").sortWithinPartitions("number", "date").cache()
dftest.count() # 438,290,000 rows

Следующий запрос теперь занимает примерно секунду (в небольшом кластере с 2 работниками):

dftest.where("number = 1000 and date = \"2001-04-04\"").count()

Однако, когда я пишу подобное условие в качестве соединения, это занимает 2 минуты:

dfsub = spark.createDataFrame([(10,"1900-01-02",1),
  (1000,"2001-04-04",2),
  (4000,"2002-05-05",3),
  (5000,"1950-06-06",4),
  (9875,"1980-07-07",5)],
["number","date", "dummy"]).repartition("number").sortWithinPartitions("number", "date").cache()
df_result = dftest.join(dfsub, ( dftest.number == dfsub.number ) & ( dftest.date == dfsub.date ), 'inner').cache()
df_result.count() # takes 2 minutes (result = 5)

Я бы ожидал, что это будет примерно одинаково быстро. Тем более, что я хотел бы надеяться, что больший массив данных уже кластеризован и кэширован. Глядя на план:

== Physical Plan ==
InMemoryTableScan [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797, number#945L, date#946, dummy#947L]
   +- InMemoryRelation [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797, number#945L, date#946, dummy#947L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(3) SortMergeJoin [number#771L, cast(date#769 as string)], [number#945L, date#946], Inner
            :- *(1) Sort [number#771L ASC NULLS FIRST, cast(date#769 as string) ASC NULLS FIRST], false, 0
            :  +- *(1) Filter (isnotnull(number#771L) && isnotnull(date#769))
            :     +- InMemoryTableScan [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797], [isnotnull(number#771L), isnotnull(date#769)]
            :           +- InMemoryRelation [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797], StorageLevel(disk, memory, deserialized, 1 replicas)
            :                 +- Sort [number#771L ASC NULLS FIRST, date#769 ASC NULLS FIRST], false, 0
            :                    +- Exchange hashpartitioning(number#771L, 32)
            :                       +- *(1) Scan ExistingRDD[number#771L,date#769,random1#775,random2#779,random3#784,random4#790,random5#797]
            +- *(2) Filter (isnotnull(number#945L) && isnotnull(date#946))
               +- InMemoryTableScan [number#945L, date#946, dummy#947L], [isnotnull(number#945L), isnotnull(date#946)]
                     +- InMemoryRelation [number#945L, date#946, dummy#947L], StorageLevel(disk, memory, deserialized, 1 replicas)
                           +- Sort [number#945L ASC NULLS FIRST, date#946 ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(number#945L, 32)
                                 +- *(1) Scan ExistingRDD[number#945L,date#946,dummy#947L]

Кажется, что много времени уходит на сортировку большего кадра данных по номеру и дате (эта строка: Sort [number#771L ASC NULLS FIRST, date#769 ASC NULLS FIRST], false, 0). Это оставляет меня со следующими вопросами:

  • внутри разделов, порядок сортировки как для левой, так и для правой стороны абсолютно одинаков и оптимален для предложения JOIN, почему Spark все еще сортирует разделы снова?
  • , поскольку 5 записей соединения соответствуют (до) 5 разделам, почему оцениваются все разделы?
  • Кажется, что Catalyst не использует информацию repartition и sortWithinPartitions кэшированного фрейма данных. Имеет ли смысл использовать sortWithinPartitions в подобных случаях?

1 Ответ

1 голос
/ 16 июля 2019

Позвольте мне попытаться ответить на ваши три вопроса:

в пределах разделов, порядок сортировки для левой и правой сторон абсолютно одинаков и оптимален для предложения JOIN, почему Sparkвсе еще сортируете разделы снова?

Порядок сортировки в обоих DataFrames НЕ одинаков, потому что разные типы данных в столбце сортировки date, в dfsub это StringType и вdftest это DateType, поэтому во время объединения Spark видит, что порядок в обеих ветвях различен и, таким образом, вынуждает Sort.

, поскольку 5 записей о соединении совпадают (до)5 разделов, почему оцениваются все разделы?

Во время обработки плана запроса Spark не знает, сколько разделов не пусто в небольшом DataFrame, и поэтому ему необходимо обработать все из них.

Кажется, что Catalyst не использует информацию перераспределения и sortWithinPartitions кэшированного фрейма данных.Имеет ли смысл использовать sortWithinPartitions в подобных случаях?

Оптимизатор Spark использует информацию из repartition и sortWithinPartitions, но есть некоторые предостережения о том, как это работает.Чтобы исправить ваш запрос, также важно перераспределить по тем же столбцам (обоим), которые вы используете в объединении (не только по одному столбцу).В принципе, это не должно быть необходимым, и сейчас идет попытка решить эту проблему jira .

Итак, вот мои предлагаемые изменения в вашем запросе:

  1. Измените тип столбца date на StringType в dftest (или аналогично измените на DateType в dfsub):

    dftest.withColumn("date", col("date").cast('string'))
    
  2. В обоих кадрах данныхизмените

    .repartition("number")
    

    на

    .repartition("number", "date")
    

После этих изменений вы должны получить план, подобный следующему:

*(3) SortMergeJoin [number#1410L, date#1653], [number#1661L, date#1662], Inner
:- Sort [number#1410L ASC NULLS FIRST, date#1653 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(number#1410L, date#1653, 200)
:     +- *(1) Project [number#1410L, cast(date#1408 as string) AS date#1653, random1#1540, random2#1544, random3#1549, random4#1555, random5#1562]
:        +- *(1) Filter (isnotnull(number#1410L) && isnotnull(cast(date#1408 as string)))
:           +- *(1) Scan ExistingRDD[number#1410L,date#1408,random1#1540,random2#1544,random3#1549,random4#1555,random5#1562]
+- Sort [number#1661L ASC NULLS FIRST, date#1662 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(number#1661L, date#1662, 200)
      +- *(2) Filter (isnotnull(number#1661L) && isnotnull(date#1662))
         +- *(2) Scan ExistingRDD[number#1661L,date#1662,dummy#1663L]

, так что есть толькоодин Exchange и один Sort в каждой ветви плана, оба из них происходят из repartition и sortWithinPartition, которые вы вызываете в своих преобразованиях, и объединение не вызывает больше сортировки или перемешивания.Также обратите внимание, что в моем плане нет InMemoryTableScan, так как я не использовал кеш.

...