Преобразование столбца collect_list в столбец другого типа данных с использованием UDF в потоковой передаче искры - PullRequest
0 голосов
/ 04 июля 2018

Требуется объединить данные из 2 потоковых источников, а затем уменьшить их для того же ключа и применить функцию для значений, чтобы преобразовать их в другой UDO (определенный пользователем тип объекта). Я понятия не имею, как это сделать. Итак, здесь я только что создал похожую проблему, как показано ниже:

2 входных потока имеют 3 столбца, id, значение и время отправки. Сначала мы объединяем его, а затем уменьшаем на основе идентификатора и получаем конечный результат со значениями (идентификатор, пользовательская функция (значения, время ожидания)). Как этого добиться?

Если я реализую это следующим образом:

val dff = df.union(df2)
  .withWatermark("posttime", "15 minutes")
  .groupBy(window($"posttime", "10 minutes", "5 minutes"),$"id")
  .agg(collect_list(struct("value", "posttime")).as("data"))
  .withColumn("data", user-defined-function("data"))

Как определить пользовательскую функцию для ввода столбца и вывода столбца с другим типом данных?

// How to tranform the Column (Type: A) to output Column (Type: B)
def user-defined-function(columnName: String): Column = {
  val x = Column(columnName).cast(List<struct>)
  val ptime = if(x.posttime < y.posttime) x.posttime else y.posttime
  val value = (x.value.toInt + y.value.toInt).toString
  return new Column(struct(value, ptime))
}

Ответы [ 2 ]

0 голосов
/ 04 июля 2018

Я видел ваш UDF и то, что вы пытаетесь сделать. Я изменил ваш логин для оптимизации. Просто проверьте, вы получаете ожидаемый результат или нет.

val dff = df.union(df2)
  .withWatermark("posttime", "15 minutes")
  .groupBy(window($"posttime", "10 minutes", "5 minutes"),$"id")
  .agg(struct(sum($"value").as("value"), min($"posttime").as("posttime")).as("data"))

или также

val dff = df.union(df2)
  .withWatermark("posttime", "15 minutes")
  .groupBy(window($"posttime", "10 minutes", "5 minutes"),$"id")
  .agg(sum($"value").as("value"), min($"posttime").as("posttime"))
0 голосов
/ 04 июля 2018

Обычно вы определяете UDF следующим образом:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

// assumption: value is of type double, posttime is timestamp
val myUDF = udf((data:Seq[Row]) => {
  val x: Seq[(Double, Timestamp)] = data.map{case Row(x:Double,y:java.sql.Timestamp) => (x,y)}

  // do something with x 

})

Тип возвращаемого значения UDF определяется типом последнего выражения. Вам не нужно создавать Column в UDF, просто используйте простые типы Scala

...