Примените функцию UDF к окну Spark, где входной параметр представляет собой список всех значений столбцов в диапазоне - PullRequest
0 голосов
/ 27 марта 2019

Я хотел бы построить скользящую среднюю на каждом ряду в окне. Допустим, -10 строк. НО, если доступно менее 10 строк, я бы хотел вставить 0 в результирующую строку -> новый столбец. Итак, чего я бы хотел добиться, это использовать UDF в агрегатном окне с входным параметром List () (или любым другим суперклассом), в котором есть значения всех доступных строк.

Вот пример кода, который не работает:

val w = Window.partitionBy("id").rowsBetween(-10, +0)
dfRetail2.withColumn("test", udftestf(dfRetail2("salesMth")).over(w))

Expected output: List( 1,2,3,4), если больше нет доступных строк, и примите это как параметр ввода для функции udf. Функция udf должна возвращать вычисленное значение или 0, если доступно менее 10 строк.

вышеуказанный код завершается: Expression 'UDF(salesMth#152L)' not supported within a window function.;;

1 Ответ

0 голосов
/ 27 марта 2019

Вы можете использовать встроенные в Spark функции Window вместе с when/otherwise для ваших конкретных условий без необходимости использования UDF / UDAF. Для простоты размер скользящего окна уменьшен до 4 в следующем примере с фиктивными данными:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = (1 to 2).flatMap(i => Seq.tabulate(8)(j => (i, i * 10.0 + j))).
  toDF("id", "amount")

val slidingWin = 4

val winSpec = Window.partitionBy($"id").rowsBetween(-(slidingWin - 1), 0)

df.
  withColumn("slidingCount", count($"amount").over(winSpec)).
  withColumn("slidingAvg", when($"slidingCount" < slidingWin, 0.0).
    otherwise(avg($"amount").over(winSpec))
  ).show
// +---+------+------------+----------+
// | id|amount|slidingCount|slidingAvg|
// +---+------+------------+----------+
// |  1|  10.0|           1|       0.0|
// |  1|  11.0|           2|       0.0|
// |  1|  12.0|           3|       0.0|
// |  1|  13.0|           4|      11.5|
// |  1|  14.0|           4|      12.5|
// |  1|  15.0|           4|      13.5|
// |  1|  16.0|           4|      14.5|
// |  1|  17.0|           4|      15.5|
// |  2|  20.0|           1|       0.0|
// |  2|  21.0|           2|       0.0|
// |  2|  22.0|           3|       0.0|
// |  2|  23.0|           4|      21.5|
// |  2|  24.0|           4|      22.5|
// |  2|  25.0|           4|      23.5|
// |  2|  26.0|           4|      24.5|
// |  2|  27.0|           4|      25.5|
// +---+------+------------+----------+

За замечание в разделе комментариев, я включаю решение через UDF ниже в качестве альтернативы:

def movingAvg(n: Int) = udf{ (ls: Seq[Double]) =>
  val (avg, count) = ls.takeRight(n).foldLeft((0.0, 1)){
    case ((a, i), next) => (a + (next-a)/i, i + 1) 
  }
  if (count <= n) 0.0 else avg  // Expand/Modify this for specific requirement
}

// To apply the UDF:
df.
  withColumn("average", movingAvg(slidingWin)(collect_list($"amount").over(winSpec))).
  show

Обратите внимание, что в отличие от sum или count, collect_list игнорирует rowsBetween() и генерирует многораздельные данные, которые потенциально могут быть очень большими для передачи в UDF (отсюда и необходимость takeRight()). Если вычисленных окон sum и count достаточно для того, что требуется для вашего конкретного требования, рассмотрите возможность передачи их в UDF.


В целом, особенно если имеющиеся данные уже в формате DataFrame, они будут работать и масштабироваться лучше, используя встроенный API DataFrame, чтобы воспользоваться преимуществами оптимизации механизма исполнения Spark, чем использование пользовательских UDF / UDAF. Возможно, вам будет интересно прочитать эту статью о преимуществах API DataFrame / Dataset по сравнению с UDF / UDAF.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...