Я пытаюсь создать функцию spark, которая рекурсивно перезаписывает столбцы ArrayType:
import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.functions._
val arrayHead = udf((sequence: Seq[String]) => sequence.head)
val arrayTail = udf((sequence: Seq[String]) => sequence.tail)
// re-produces the ArrayType column recursively
val rewriteArrayCol = (c: Column) => {
def helper(elementsRemaining: Column, outputAccum: Column): Column = {
when(size(elementsRemaining) === lit(0), outputAccum)
.otherwise(helper(arrayTail(elementsRemaining), concat(outputAccum, array(arrayHead(elementsRemaining)))))
}
helper(c, array())
}
// Test
val df =
Seq("100" -> Seq("a", "b", "b", "b", "b", "b", "c", "c", "d"))
.toDF("id", "sequence")
// .withColumn("test_tail", arrayTail($"sequence")) //head & tail udfs work
// .withColumn("test", rewriteArrayCol($"sequence")) //stackoverflow if uncommented
display(df)
К сожалению, я получаю переполнение стека.Я считаю, что одной из областей, в которых отсутствует эта функция, является то, что она не является хвостовой рекурсией;т. е. весь блок 'when (). else ()' не совпадает с блоком 'if else'.Тем не менее, функция в настоящее время генерирует переполнение стека при применении к даже крошечным фреймам данных (поэтому я считаю, что в этом должно быть больше ошибок, чем просто отсутствие хвостовой рекурсии).
Я не смог найтиЛюбые примеры аналогичной функции в Интернете, поэтому я подумал, что я хотел бы спросить здесь.Единственные реализации функций Column => Column, которые мне удалось найти, - это очень и очень простые из них, которые не помогли в этом случае.
Примечание: я могу достичь функциональностивыше с использованием UDF.Причина, по которой я пытаюсь создать функцию Column => Column, заключается в том, что Spark лучше оптимизирует их по сравнению с пользовательскими функциями (насколько я знаю).