Spark: рекурсивная функция 'ArrayType Column => ArrayType Column' - PullRequest
0 голосов
/ 30 января 2019

Я пытаюсь создать функцию spark, которая рекурсивно перезаписывает столбцы ArrayType:

import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.functions._

val arrayHead = udf((sequence: Seq[String]) => sequence.head)
val arrayTail = udf((sequence: Seq[String]) => sequence.tail)

// re-produces the ArrayType column recursively
val rewriteArrayCol = (c: Column) => {

  def helper(elementsRemaining: Column, outputAccum: Column): Column = {

    when(size(elementsRemaining) === lit(0), outputAccum)
    .otherwise(helper(arrayTail(elementsRemaining), concat(outputAccum, array(arrayHead(elementsRemaining)))))
  }

  helper(c, array())
}


// Test
val df = 
  Seq("100"  -> Seq("a", "b", "b", "b", "b", "b", "c", "c", "d"))
  .toDF("id", "sequence")
//  .withColumn("test_tail", arrayTail($"sequence"))   //head & tail udfs work
//  .withColumn("test", rewriteArrayCol($"sequence"))  //stackoverflow if uncommented

display(df)

К сожалению, я получаю переполнение стека.Я считаю, что одной из областей, в которых отсутствует эта функция, является то, что она не является хвостовой рекурсией;т. е. весь блок 'when (). else ()' не совпадает с блоком 'if else'.Тем не менее, функция в настоящее время генерирует переполнение стека при применении к даже крошечным фреймам данных (поэтому я считаю, что в этом должно быть больше ошибок, чем просто отсутствие хвостовой рекурсии).

Я не смог найтиЛюбые примеры аналогичной функции в Интернете, поэтому я подумал, что я хотел бы спросить здесь.Единственные реализации функций Column => Column, которые мне удалось найти, - это очень и очень простые из них, которые не помогли в этом случае.

Примечание: я могу достичь функциональностивыше с использованием UDF.Причина, по которой я пытаюсь создать функцию Column => Column, заключается в том, что Spark лучше оптимизирует их по сравнению с пользовательскими функциями (насколько я знаю).

1 Ответ

0 голосов
/ 30 января 2019

Это не сработает, потому что здесь нет значимого условия остановки.when / otherwise не являются блоками потока управления на уровне языка (следовательно, не могут прервать выполнение), и функция будет просто возвращаться навсегда.

Фактически, она не остановится даже для пустого массива, вне любогоКонтекст оценки SQL:

rewriteArrayCol(array())

Более того, вы предполагаете, что неверны.Пропуская тот факт, что ваш код десериализует данные дважды (один раз для каждого arrayHead, arrayTail), что намного хуже, чем просто вызов udf один раз (хотя этого можно избежать с помощью slice), очень сложные выражения приходят со своими собственнымипроблемы, одной из которых является ограничение размера генерации кода.

Не отчаивайтесь, хотя там уже есть действительное решение - transform.См. Как использовать функцию преобразования высшего порядка?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...