Допустим, у меня есть следующий DataFrame:
+----------+------------+-------------------+--------+----------+
|IDENTIFIER|NEXT_RECORDS|TYPE |POSITION|FRUIT |
+----------+------------+-------------------+--------+----------+
|1_1 |[3_1] |Ready for next |E |Apple |
|2_1 |[3_1] |Ready for next |E |Apple |
|3_1 |[4_1] |Ready from previous|X |Lemon |
|3_1 |[5_1] |Ready from previous|X |Lemon |
|4_1 |[6_1] |Ready for next |X |Orange |
|5_1 |[7_1] |Ready for next |X |Orange |
|6_1 |[8_1] |Ready from previous|X |Strawberry|
|7_1 |[8_1] |Ready from previous|X |Strawberry|
|8_1 |[] |Ready for next |X |Pineapple |
|9_1 |[10_1] |Ready for next |E |Cherry |
|10_1 |[] |Ready from previous|X |Orange |
+----------+------------+-------------------+--------+----------+
В этой таблице представлены некоторые движения, связанные друг с другом на основе столбца «NEXT_RECORDS». Я хотел бы добавить дополнительный столбец, в котором рассказывается, какой был первый «ФРУКТ» из всей моей цепочки. Я знаю, что все движения с TYPE = "Ready to next" AND POSITION = "E" являются началом цепочки. Поэтому в основном я пытаюсь достичь чего-то вроде:
+----------+------------+-------------------+--------+----------+-----------+
|IDENTIFIER|NEXT_RECORDS|TYPE |POSITION|FRUIT |FIRST_FRUIT|
+----------+------------+-------------------+--------+----------+-----------+
|1_1 |[3_1] |Ready for next |E |Apple |Apple |
|2_1 |[3_1] |Ready for next |E |Apple |Apple |
|3_1 |[4_1] |Ready from previous|X |Lemon |Apple |
|3_1 |[5_1] |Ready from previous|X |Lemon |Apple |
|4_1 |[6_1] |Ready for next |X |Orange |Apple |
|5_1 |[7_1] |Ready for next |X |Orange |Apple |
|6_1 |[8_1] |Ready from previous|X |Strawberry|Apple |
|7_1 |[8_1] |Ready from previous|X |Strawberry|Apple |
|8_1 |[] |Ready for next |X |Pineapple |Apple |
|9_1 |[10_1] |Ready for next |E |Cherry |Cherry |
|10_1 |[] |Ready from previous|X |Orange |Cherry |
+----------+------------+-------------------+--------+----------+-----------+
В этом примере движения 1_1 и 2_1 являются началом цепочки, а 9_1 - началом цепочки. Итак, первый фрукт для первого движения - это яблоко, а первый фрукт для второго - это вишня.
Сейчас я уже кое-что попробовал, но это немного сложно:
Первый разделение мой ввод основан на типе:
val readyForNext = input.filter(col("TYPE") === "Ready for next")
val readyFromPrevious = input.filter(col("TYPE") === "Ready from previous")
Я получаю движения в начале цепочки:
val firstRecords = readyForNext.filter(col("POSITION") === "E")
.withColumn("FIRST_FRUIT", col("FRUIT"))
Это будет мой начальный DataFrame. Я создал рекурсивную функцию, которая начинается с этого DataFrame, объединяет следующие движения и добавляет первый плод к этим записям:
var newCount = 0L
var counter = 0
var matchesFound = true
val columns = input.columns
def getMultipleLinks(df: DataFrame): DataFrame = {
val oldResultCount = newCount
val intermediate = if (counter % 2 == 0) {
link(df, readyFromPrevious, columns)
} else {
link(df, readyForNext, columns)
}
counter += 1
newCount = intermediate.filter(col("FIRST_FRUIT").isNull).count
matchesFound = oldResultCount != newCount
if (matchesFound) {
val matches = getMultipleLinks(intermediate)
matches
} else {
intermediate
}
}
def link(from: DataFrame, to: DataFrame, columns: Array[String]): DataFrame = {
val fromWithJoinKey = from.withColumn("JOIN_KEY", explode(col("NEXT_RECORDS")))
val toWithJoinKey = to.withColumn("JOIN_KEY", col("IDENTIFIER"))
fromWithJoinKey
.as("left")
.join(toWithJoinKey.as("right"), Seq("JOIN_KEY"), "right")
.select("right.*", "left.FIRST_FRUIT")
.distinct
.withColumn("FIRST_FRUIT", when(col("FIRST_FRUIT").isNull, first("FIRST_FRUIT", ignoreNulls = true).over(Window.partitionBy("NEXT_RECORDS"))).otherwise(col("FIRST_FRUIT")))
.drop("JOIN_KEY")
.union(fromWithJoinKey.drop("JOIN_KEY"))
.groupBy(columns.map(col): _*)
.agg(
first("FIRST_FRUIT", ignoreNulls = true).as("FIRST_FRUIT")
)
}
getMultipleLinks(firstRecords).show(20, false)
Это дает мне правильный результат, но очень неэффективно! При выполнении этого на большом наборе данных это займет несколько часов Spark и, в конечном итоге, приведет к ошибке Java Heap Space.
Поэтому мой вопрос заключается в том, как улучшить мой текущий код или есть какой-то другой способ связать движения без использования рекурсивной функции? И если другого пути нет, поможет ли кэширование промежуточного DataFrame внутри моей рекурсивной функции?