Мне нужно сделать левое соединение между основным фреймом данных и несколькими опорными фреймами, так что вычисление цепного соединения.И мне интересно, как сделать это действие эффективным и масштабируемым.
Метод 1 прост для понимания, который также является текущим методом, но я не удовлетворен, потому что все преобразования были объединены в цепочку и ожидали финальногодействие для запуска вычисления, если я продолжу добавлять преобразование и объем данных, в конце произойдет сбой spark, поэтому этот метод не масштабируется.
Метод 1:
def pipeline(refDF1: DataFrame, refDF2: DataFrame, refDF3: DataFrame, refDF4: DataFrame, refDF5: DataFrame): DataFrame = {
val transformations: List[DataFrame => DataFrame] = List(
castColumnsFromStringToLong(ColumnsToCastToLong),
castColumnsFromStringToFloat(ColumnsToCastToFloat),
renameColumns(RenameMapping),
filterAndDropColumns,
joinRefDF1(refDF1),
joinRefDF2(refDF2),
joinRefDF3(refDF3),
joinRefDF4(refDF4),
joinRefDF5(refDF5),
calculate()
)
transformations.reduce(_ andThen _)
}
pipeline(refDF1, refDF2, refDF3, refDF4, refDF5)(principleDF)
Метод 2: Я не нашел реального способа реализовать свою идею, но я надеюсь немедленно запустить вычисление каждого соединения.
в соответствии с моим тестом count () слишком тяжел для искры и бесполезен длямое приложение, но я не знаю, как запустить вычисление соединения с эффективным действием .Такого рода действия фактически являются ответом на этот вопрос.
val joinedDF_1 = castColumnsFromStringToLong(principleDF, ColumnsToCastToLong)
joinedDF_1.cache() // joinedDF is not always used multiple times, but for some data frame, it is, so I add cache() to indicate the usage
joinedDF_1.count()
val joinedDF_2 = castColumnsFromStringToFloat(joinedDF_1, ColumnsToCastToFloat)
joinedDF_2.cache()
joinedDF_2.count()
val joinedDF_3 = renameColumns(joinedDF_2, RenameMapping)
joinedDF_3.cache()
joinedDF_3.count()
val joinedDF_4 = filterAndDropColumns(joinedDF_4)
joinedDF_4.cache()
joinedDF_4.count()
...