Spark w / Scala способ использования нескольких разнородных столбцов в UDF - PullRequest
0 голосов
/ 11 февраля 2020

Скажем, у меня есть фрейм данных с несколькими столбцами, возможно, разных типов. Мне нужно написать UDF, который принимает входные данные из нескольких столбцов, выполняет довольно сложные вычисления и возвращает результат (например, строку).

val dataframe = Seq( (1.0, Array(0, 2, 1), Array(0, 2, 3), 23.0, 21.0),
                     (1.0, Array(0, 7, 1), Array(1, 2, 3), 42.0, 41.0)).toDF(
                     "c", "a1", "a2", "t1", "t2")

Например: ("c" * sum("a1") + sum("a2")).toString + "t1".toString

На самом деле вычисление занимает много времени, и массивы содержат около миллиона элементов. Я довольно новичок в Spark и был бы признателен, если бы предоставили пример кода или указатель на ресурс (с Scala примерами).

TIA

1 Ответ

1 голос
/ 11 февраля 2020

вот пример UDF:

val udf_doComputation = udf((c:Double, a1:Seq[Int],a2:Seq[Int],t1:Double) => {
      // your complex computation goes here
      (c*a1.sum+a2.sum).toString() + t1.toString()
    })

 dataframe
   .withColumn("result",udf_doComputation($"c",$"a1",$"a2",$"t1"))
   .show()

дает:

+---+---------+---------+----+----+--------+
|  c|       a1|       a2|  t1|  t2|  result|
+---+---------+---------+----+----+--------+
|1.0|[0, 2, 1]|[0, 2, 3]|23.0|21.0| 8.023.0|
|1.0|[0, 7, 1]|[1, 2, 3]|42.0|41.0|14.042.0|
+---+---------+---------+----+----+--------+

Обратите внимание, что имена переменных UDF не обязательно должны совпадать с именами столбцов, но типы должны match:

  • примитивы типа A отображаются прямо на A. Но есть несколько допустимых отображений, например, double в карте данных либо на Double, либо на java.lang.Double et c. Но вы не можете сопоставить с Option[A]! Поэтому, если ваш ввод может быть нулевым, вам нужно использовать соответствующие типы из java.lang.* ...
  • array примитивов типа A, сопоставляемых с Seq[A], например, array<int> сопоставления с Seq[Int]. Тип бетона будет WrappedArray, поэтому сопоставление с этим или IndexedSeq также будет работать. Важно знать, что тип среды выполнения проиндексирован.
  • struct соответствует Row
  • array<struct> соответствует Seq[Row]
...