Для кэшированного фрейма данных, разбитого на разделы и отсортированного по разделам, я получаю хорошую производительность при запросе ключа с предложением where
, но низкую производительность при выполнении соединения с небольшой таблицей на том же ключе.
См. Пример набора данных dftest
ниже с 10Kx44K = 438M строк.
sqlContext.sql(f'set spark.sql.shuffle.partitions={32}')
sqlContext.clearCache()
sc.setCheckpointDir('/checkpoint/temp')
import datetime
from pyspark.sql.functions import *
from pyspark.sql import Row
start_date = datetime.date(1900, 1, 1)
end_date = datetime.date(2020, 1, 1)
dates = [ start_date + datetime.timedelta(n) for n in range(int ((end_date - start_date).days))]
dfdates=spark.createDataFrame(list(map(lambda x: Row(date=x), dates))) # some dates
dfrange=spark.createDataFrame(list(map(lambda x: Row(number=x), range(10000)))) # some number range
dfjoin = dfrange.crossJoin(dfdates)
dftest = dfjoin.withColumn("random1", round(rand()*(10-5)+5,0)).withColumn("random2", round(rand()*(10-5)+5,0)).withColumn("random3", round(rand()*(10-5)+5,0)).withColumn("random4", round(rand()*(10-5)+5,0)).withColumn("random5", round(rand()*(10-5)+5,0)).checkpoint()
dftest = dftest.repartition("number").sortWithinPartitions("number", "date").cache()
dftest.count() # 438,290,000 rows
Следующий запрос теперь занимает примерно секунду (в небольшом кластере с 2 работниками):
dftest.where("number = 1000 and date = \"2001-04-04\"").count()
Однако, когда я пишу подобное условие в качестве соединения, это занимает 2 минуты:
dfsub = spark.createDataFrame([(10,"1900-01-02",1),
(1000,"2001-04-04",2),
(4000,"2002-05-05",3),
(5000,"1950-06-06",4),
(9875,"1980-07-07",5)],
["number","date", "dummy"]).repartition("number").sortWithinPartitions("number", "date").cache()
df_result = dftest.join(dfsub, ( dftest.number == dfsub.number ) & ( dftest.date == dfsub.date ), 'inner').cache()
df_result.count() # takes 2 minutes (result = 5)
Я бы ожидал, что это будет примерно одинаково быстро. Тем более, что я хотел бы надеяться, что больший массив данных уже кластеризован и кэширован. Глядя на план:
== Physical Plan ==
InMemoryTableScan [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797, number#945L, date#946, dummy#947L]
+- InMemoryRelation [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797, number#945L, date#946, dummy#947L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(3) SortMergeJoin [number#771L, cast(date#769 as string)], [number#945L, date#946], Inner
:- *(1) Sort [number#771L ASC NULLS FIRST, cast(date#769 as string) ASC NULLS FIRST], false, 0
: +- *(1) Filter (isnotnull(number#771L) && isnotnull(date#769))
: +- InMemoryTableScan [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797], [isnotnull(number#771L), isnotnull(date#769)]
: +- InMemoryRelation [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- Sort [number#771L ASC NULLS FIRST, date#769 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(number#771L, 32)
: +- *(1) Scan ExistingRDD[number#771L,date#769,random1#775,random2#779,random3#784,random4#790,random5#797]
+- *(2) Filter (isnotnull(number#945L) && isnotnull(date#946))
+- InMemoryTableScan [number#945L, date#946, dummy#947L], [isnotnull(number#945L), isnotnull(date#946)]
+- InMemoryRelation [number#945L, date#946, dummy#947L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Sort [number#945L ASC NULLS FIRST, date#946 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(number#945L, 32)
+- *(1) Scan ExistingRDD[number#945L,date#946,dummy#947L]
Кажется, что много времени уходит на сортировку большего кадра данных по номеру и дате (эта строка: Sort [number#771L ASC NULLS FIRST, date#769 ASC NULLS FIRST], false, 0
). Это оставляет меня со следующими вопросами:
- внутри разделов, порядок сортировки как для левой, так и для правой стороны абсолютно одинаков и оптимален для предложения JOIN, почему Spark все еще сортирует разделы снова?
- , поскольку 5 записей соединения соответствуют (до) 5 разделам, почему оцениваются все разделы?
- Кажется, что Catalyst не использует информацию
repartition
и sortWithinPartitions
кэшированного фрейма данных. Имеет ли смысл использовать sortWithinPartitions
в подобных случаях?