Я соединяю два фрейма данных df1
(15 тыс. Строк) и df2
(с 6 млн. Строк). Я транслировал df1
и репаративно df2
на 20.
И spark.sql.shuffle.partition
по умолчанию установлено на 200. Теперь при объединении этих двух DF данные снова перетасовываются, и я вижу, что 200 задач создается только с 4 разделами, имеющими данные.
В идеале, здесь нет перестановок, поскольку я транслировал небольшой DF. Но все же объединение приводит к 200 задачам с неравными записями. Я ожидаю, что это не должно перераспределять данные, поскольку я уже транслировал таблицу. Как я вижу в плане объяснения, вызывается SortMergeJoin.
def HC_LE_BM(BM, HC_LE):
spark.conf.set('spark.sql.join.preferSortMergeJoin','False')
print(spark.conf.get('spark.sql.join.preferSortMergeJoin'))
df1 = HC_LE
df2 = BM
df2 = df2.repartition(20).cache()
print("Print partition", df2.rdd.getNumPartitions())
print("Storage level", df2.storageLevel)
BENCH_HEIGHT_OFFSET = 25
df3 = df2.join(F.broadcast(df1), (
(df1.SRC_DIG_X >= df2.min_x) &
(df1.SRC_DIG_X < df2.max_x) &
(df1.SRC_DIG_Y >= df2.min_y) &
(df1.SRC_DIG_Y < df2.max_y) &
(df1.SRC_DIG_Z <= df2.min_z) &
((df1.SRC_DIG_Z + BENCH_HEIGHT_OFFSET) >= df2.max_z) &
(df1.ORIGIN == df2.ORIGIN_BM)
),
how = 'right'
)\
.select('*')
df3.explain(True)
df3.show(truncate=False)
Пожалуйста, найдите прикрепленный журнал.
== Физический план == SortMergeJoin [ORIGIN_BM # 6341], [ORIGIN # 6395], RightOuter, ((((((SRC_DIG_X # 6519> = min_x # 6360) && (SRC_DIG_X # 6519 = min_y # 6362)) && (SRC_DIG_Y # 6520 = max_z # 6365)): - * (2) Сортировать [ORIGIN_BM # 6341 AS C NULLS FIRST], false, 0: + - Обмен хэш-разделами (ORIGIN_BM # 6341, 200), true: + - * (1) Фильтр ((((((isnotnull (min_z # 6364) && isnotnull (max_y # 6363)) && isnotnull (max_x # 6361) ) && isnotnull (max_z # 6365)) && isnotnull (min_y # 6362)) && isnotnull (min_x # 6360)) && isnotnull (ORIGIN_BM # 6341)): + - InMemoryTableScan [ORIGIN_BM # 6341, ORIGIN_L3_BID_B3 centroid_x # 6344, centroid_y # 6345, centroid_z # 6346, dim_x # 6347, dim_y # 6348, dim_z # 6349, том # 6350, rktype_now # 6351, rkzone_now # 6352, densnow_2013 # 6353, category_dil # 6354, class_d # 6355, btot_dil # 6356, as_dil # 6357, densnow_2015 # 6358, densnow_rma # 6359, min_x # 6360, max_x # 6361, min_y # 6362, max_y # 6363, min_z # 6364, max_z # 6365], [min_z # 6365], [min_z # 6365], [min_z # 6365], [min_z # 6365], [not_n # 6365], [not_n # 6365], [not_n # 6365], [not_n # 6365], [not_null 6 ) isnotnull (max_y # 6363), isnotnull (max_x # 6361), isnotnull (max_z # 6365), isnotnull (min_y # 6362), isnotnull (min_x # 6360), isnotnull (ORIGIN_BM # 6341)]: + - ORMemIN_lation # 6341, ORIGIN_LOCATION_BENCH_BM # 6342, block_id # 6343, centroid_x # 6344, centroid_y # 6345, centroid_z # 6346, dim_x # 6347, dim_y # 6348, dim_z # 6349, том # 6350, rktype_now # 6351, 6355, 6353, gz5, #355, gz5 # 6353, 6355, gz5, gz5 # 6351, gz5, gz5 # 6351, gz5, gz5, все же , category_dil # 6354, class_dil # 6355, btot_dil # 6356, as_dil # 6357, densnow_2015 # 6358, densnow_rma # 6359, min_x # 6360, max_x # 6361, min_y # 6362, max_y # 6363, min_z # 6364, min_z # 6364, min_z # 6364, min_z # 6364 StorageLevel (диск, память, десериализовано, 1 реплика): + - Exchange RoundRobinPartitioning (20), false: + - (1) FileScan parquet! Ri.foun dry .main.transaction.00000000-e6aa-5004- bd4a-ac6c2019674d: ri.foun dry .main.transaction 00000000-e6aa-5004-bd4a-ac6c2019674d@00000000-4c09-7aa9-9c6c-752a1b9849cd:. master.ri.foun dry .main.dataset.667a 9c8b-dde2-4524-ACAE-7f6c4183141e [ORIGIN_BM # 6341, # 6342 ORIGIN_LOCATION_BENCH_BM, block_id # 6343, # 6344 centroid_x, centroid_y # 6345, # 6346 centroid_z, dim_x # 6347, # 6348 dim_y, dim_z # 6349, # 6350 Объем, rktype_now # 6351, rkzone_now # 6352, densnow_2013 # 6353, category_dil # 6354, class_dil # 6355, btot_dil # 6356, as_dil # 6357, densnow_2015 # 6358, densnow_rma # 6359, min_x # 6360, max_x # 6361, min_y # 6362, max_y # 6363, min_z # 6364, max_z # 6365] Пакетная обработка: true, фильтры данных: [], формат: паркет, расположение: FoundryCatalogFileIndex [sparkfoun dry: //.com/datasets/ri.foundry.main .datase ..., PartitionFilters: [], PressedFilters: [], ReadSchema: struct.com/datasets/ri.foundry.main.datase ..., PartitionFilters: [], PressedFilters: [], ReadSchema: структура