Я выполняю внутреннее соединение, скажем, с 8 кадрами данных, все из одного и того же родителя.Пример кода:
// read parquet
val readDF = session.read.parquet(...)
// multiple expensive transformations are performed over readDF, making its DAG grow
// repartition + cache
val df = readDF.repartition($"type").cache
val df1 = df.filter($"type" === 1)
val df2 = df.filter($"type" === 2)
val df3 = df.filter($"type" === 3)
val df4 = df.filter($"type" === 4)
val df5 = df.filter($"type" === 5)
val df6 = df.filter($"type" === 6)
val df7 = df.filter($"type" === 7)
val df8 = df.filter($"type" === 8)
val joinColumns = Seq("col1", "col2", "col3", "col4")
val joinDF = df1
.join(df2, joinColumns)
.join(df3, joinColumns)
.join(df4, joinColumns)
.join(df5, joinColumns)
.join(df6, joinColumns)
.join(df7, joinColumns)
.join(df8, joinColumns)
Неожиданно предложение joinDF
занимает много времени.Присоединение должно быть преобразованием, а не действием.
Знаете ли вы, что происходит?Это вариант использования для контрольной точки?
Примечания: - joinDF.explain
показывает длинную линию DAG.- используя Spark 2.3.0 с Scala