Цепные записи в DataFrame с помощью рекурсивной функции - PullRequest
1 голос
/ 29 апреля 2020

Допустим, у меня есть следующий DataFrame:

+----------+------------+-------------------+--------+----------+
|IDENTIFIER|NEXT_RECORDS|TYPE               |POSITION|FRUIT     |
+----------+------------+-------------------+--------+----------+
|1_1       |[3_1]       |Ready for next     |E       |Apple     |
|2_1       |[3_1]       |Ready for next     |E       |Apple     |
|3_1       |[4_1]       |Ready from previous|X       |Lemon     |
|3_1       |[5_1]       |Ready from previous|X       |Lemon     |
|4_1       |[6_1]       |Ready for next     |X       |Orange    |
|5_1       |[7_1]       |Ready for next     |X       |Orange    |
|6_1       |[8_1]       |Ready from previous|X       |Strawberry|
|7_1       |[8_1]       |Ready from previous|X       |Strawberry|
|8_1       |[]          |Ready for next     |X       |Pineapple |
|9_1       |[10_1]      |Ready for next     |E       |Cherry    |
|10_1      |[]          |Ready from previous|X       |Orange    |
+----------+------------+-------------------+--------+----------+

В этой таблице представлены некоторые движения, связанные друг с другом на основе столбца «NEXT_RECORDS». Я хотел бы добавить дополнительный столбец, в котором рассказывается, какой был первый «ФРУКТ» из всей моей цепочки. Я знаю, что все движения с TYPE = "Ready to next" AND POSITION = "E" являются началом цепочки. Поэтому в основном я пытаюсь достичь чего-то вроде:

+----------+------------+-------------------+--------+----------+-----------+
|IDENTIFIER|NEXT_RECORDS|TYPE               |POSITION|FRUIT     |FIRST_FRUIT|
+----------+------------+-------------------+--------+----------+-----------+
|1_1       |[3_1]       |Ready for next     |E       |Apple     |Apple      |
|2_1       |[3_1]       |Ready for next     |E       |Apple     |Apple      |
|3_1       |[4_1]       |Ready from previous|X       |Lemon     |Apple      |
|3_1       |[5_1]       |Ready from previous|X       |Lemon     |Apple      |
|4_1       |[6_1]       |Ready for next     |X       |Orange    |Apple      |
|5_1       |[7_1]       |Ready for next     |X       |Orange    |Apple      |
|6_1       |[8_1]       |Ready from previous|X       |Strawberry|Apple      |
|7_1       |[8_1]       |Ready from previous|X       |Strawberry|Apple      |
|8_1       |[]          |Ready for next     |X       |Pineapple |Apple      |
|9_1       |[10_1]      |Ready for next     |E       |Cherry    |Cherry     |
|10_1      |[]          |Ready from previous|X       |Orange    |Cherry     |
+----------+------------+-------------------+--------+----------+-----------+ 

В этом примере движения 1_1 и 2_1 являются началом цепочки, а 9_1 - началом цепочки. Итак, первый фрукт для первого движения - это яблоко, а первый фрукт для второго - это вишня.

Сейчас я уже кое-что попробовал, но это немного сложно:

Первый разделение мой ввод основан на типе:

val readyForNext = input.filter(col("TYPE") === "Ready for next")
val readyFromPrevious = input.filter(col("TYPE") === "Ready from previous")

Я получаю движения в начале цепочки:

val firstRecords = readyForNext.filter(col("POSITION") === "E")
      .withColumn("FIRST_FRUIT", col("FRUIT"))

Это будет мой начальный DataFrame. Я создал рекурсивную функцию, которая начинается с этого DataFrame, объединяет следующие движения и добавляет первый плод к этим записям:

var newCount = 0L
var counter = 0
var matchesFound = true
val columns = input.columns

def getMultipleLinks(df: DataFrame): DataFrame = {
   val oldResultCount = newCount

   val intermediate = if (counter % 2 == 0) {
     link(df, readyFromPrevious, columns)
   } else {
     link(df, readyForNext, columns)
   }

   counter += 1

   newCount = intermediate.filter(col("FIRST_FRUIT").isNull).count
   matchesFound = oldResultCount != newCount

   if (matchesFound) {
      val matches = getMultipleLinks(intermediate)
      matches
   } else {
      intermediate
   }
}

def link(from: DataFrame, to: DataFrame, columns: Array[String]): DataFrame = {
   val fromWithJoinKey = from.withColumn("JOIN_KEY", explode(col("NEXT_RECORDS")))
   val toWithJoinKey = to.withColumn("JOIN_KEY", col("IDENTIFIER"))

   fromWithJoinKey
     .as("left")
     .join(toWithJoinKey.as("right"), Seq("JOIN_KEY"), "right")
     .select("right.*", "left.FIRST_FRUIT")
     .distinct
     .withColumn("FIRST_FRUIT", when(col("FIRST_FRUIT").isNull, first("FIRST_FRUIT", ignoreNulls = true).over(Window.partitionBy("NEXT_RECORDS"))).otherwise(col("FIRST_FRUIT")))
     .drop("JOIN_KEY")
     .union(fromWithJoinKey.drop("JOIN_KEY"))
     .groupBy(columns.map(col): _*)
     .agg(
        first("FIRST_FRUIT", ignoreNulls = true).as("FIRST_FRUIT")
         )
    }

    getMultipleLinks(firstRecords).show(20, false)

Это дает мне правильный результат, но очень неэффективно! При выполнении этого на большом наборе данных это займет несколько часов Spark и, в конечном итоге, приведет к ошибке Java Heap Space.

Поэтому мой вопрос заключается в том, как улучшить мой текущий код или есть какой-то другой способ связать движения без использования рекурсивной функции? И если другого пути нет, поможет ли кэширование промежуточного DataFrame внутри моей рекурсивной функции?

1 Ответ

0 голосов
/ 29 апреля 2020

Попробуйте это. Убедитесь, что ваш IDENTIFIER можно сортировать.

Сначала я создал новый столбец FF_INTER, основываясь на условии, что TYPE готов к следующему, а ПОЛОЖЕНИЕ - E, в котором будет Fruit, в противном случае - null. Затем используйте окно, чтобы найти последний не-нуль, используя функцию last, когда заказано IDENTIFIER

scala> df.show(false)
+----------+------------+-------------------+--------+----------+
|IDENTIFIER|NEXT_RECORDS|TYPE               |POSITION|FRUIT     |
+----------+------------+-------------------+--------+----------+
|11        |[31]        |Ready for next     |E       |Apple     |
|21        |[31]        |Ready for next     |E       |Apple     |
|31        |[41]        |Ready from previous|X       |Lemon     |
|31        |[51]        |Ready from previous|X       |Lemon     |
|41        |[61]        |Ready for next     |X       |Orange    |
|51        |[71]        |Ready for next     |X       |Orange    |
|61        |[81]        |Ready from previous|X       |Strawberry|
|71        |[81]        |Ready from previous|X       |Strawberry|
|81        |[]          |Ready for next     |X       |Pineapple |
|91        |[101]       |Ready for next     |E       |Cherry    |
|101       |[]          |Ready from previous|X       |Orange    |
+----------+------------+-------------------+--------+----------+


scala> 

scala> val dfFirstFruit = df.withColumn("FF_INTER", 
     |   when($"TYPE" === "Ready for next" && $"POSITION" === "E", $"FRUIT"))
dfFirstFruit: org.apache.spark.sql.DataFrame = [IDENTIFIER: int, NEXT_RECORDS: array<int> ... 4 more fields]

scala> 

scala> dfFirstFruit.show(false)
+----------+------------+-------------------+--------+----------+--------+
|IDENTIFIER|NEXT_RECORDS|TYPE               |POSITION|FRUIT     |FF_INTER|
+----------+------------+-------------------+--------+----------+--------+
|11        |[31]        |Ready for next     |E       |Apple     |Apple   |
|21        |[31]        |Ready for next     |E       |Apple     |Apple   |
|31        |[41]        |Ready from previous|X       |Lemon     |null    |
|31        |[51]        |Ready from previous|X       |Lemon     |null    |
|41        |[61]        |Ready for next     |X       |Orange    |null    |
|51        |[71]        |Ready for next     |X       |Orange    |null    |
|61        |[81]        |Ready from previous|X       |Strawberry|null    |
|71        |[81]        |Ready from previous|X       |Strawberry|null    |
|81        |[]          |Ready for next     |X       |Pineapple |null    |
|91        |[101]       |Ready for next     |E       |Cherry    |Cherry  |
|101       |[]          |Ready from previous|X       |Orange    |null    |
+----------+------------+-------------------+--------+----------+--------+


scala> 

scala> val overColumns = org.apache.spark.sql.expressions.Window.orderBy("IDENTIFIER")
overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@6b31241e

scala> dfFirstFruit.withColumn("FRUIT_FIRST", 
     |   org.apache.spark.sql.functions.last("FF_INTER", true).over(overColumns))
res11: org.apache.spark.sql.DataFrame = [IDENTIFIER: int, NEXT_RECORDS: array<int> ... 5 more fields]

scala> res11.show(false)
20/04/29 12:28:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----------+------------+-------------------+--------+----------+--------+-----------+
|IDENTIFIER|NEXT_RECORDS|TYPE               |POSITION|FRUIT     |FF_INTER|FRUIT_FIRST|
+----------+------------+-------------------+--------+----------+--------+-----------+
|11        |[31]        |Ready for next     |E       |Apple     |Apple   |Apple      |
|21        |[31]        |Ready for next     |E       |Apple     |Apple   |Apple      |
|31        |[41]        |Ready from previous|X       |Lemon     |null    |Apple      |
|31        |[51]        |Ready from previous|X       |Lemon     |null    |Apple      |
|41        |[61]        |Ready for next     |X       |Orange    |null    |Apple      |
|51        |[71]        |Ready for next     |X       |Orange    |null    |Apple      |
|61        |[81]        |Ready from previous|X       |Strawberry|null    |Apple      |
|71        |[81]        |Ready from previous|X       |Strawberry|null    |Apple      |
|81        |[]          |Ready for next     |X       |Pineapple |null    |Apple      |
|91        |[101]       |Ready for next     |E       |Cherry    |Cherry  |Cherry     |
|101       |[]          |Ready from previous|X       |Orange    |null    |Cherry     |
+----------+------------+-------------------+--------+----------+--------+-----------+

...