Передать структуру в UDAF в искре - PullRequest
0 голосов
/ 04 февраля 2019

У меня есть следующая схема -

root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)
 |-- name: string (nullable = true)

Как передать структуру 'cars' в udaf?Какой должна быть inputSchema, если я просто хочу передать структуру автомобилей.

1 Ответ

0 голосов
/ 05 февраля 2019

Можно, но логика UDAF была бы другой.Например, если у вас есть две строки:

val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))

val rdd = spark.sparkContext.parallelize(seq)

Здесь схема имеет вид

root
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)

, тогда, если вы попытаетесь вызвать агрегацию:

val df = seq.toDF
df.agg(agg0(col("cars")))

YouВы должны изменить свою схему ввода UDAF, например:

val carsSchema =
    StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))

, и в мальчике вашего UDAF вы должны иметь дело с этой схемой, изменяя inputSchema:

override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)

В вашем методе обновления вы должны иметь делос форматом ваших строк ввода:

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  val i = input.getAs[Array[Array[String]]](0)
  // i here would be [car1,car2,car3],  an array of strings
  buffer(0) = ???
}

Отсюда вы можете преобразовать i для обновления буфера и завершения функций слияния и оценки.

...