Обработка динамической схемы ввода Spark UDAF - PullRequest
0 голосов
/ 06 февраля 2019

Я знаю, как передать структуру с внутренней структурой в UDAF из этого - Передать структуру в UDAF в искре

Но как мне обрабатывать случаи, когда схема внутренней структурынеизвестен или динамичен в том смысле, что он изменяется в зависимости от данных.Некоторые поля могут существовать или не существовать, поскольку входные данные не соответствуют конкретной схеме.Допустим, один набор данных имеет

   root
     |-- id:string (nullable = false)
     |-- age: long (nullable = true)
     |-- cars: struct (nullable = true)
     |    |-- car1: string (nullable = true)
     |    |-- car2: string (nullable = true)
     |    |-- car3: string (nullable = true)
     |-- name: string (nullable = true)

А другой набор данных не имеет car3

root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |-- name: string (nullable = true)

Как мне написать UDAF, который принимает схему, котораяизменения на основе входных данных.

1 Ответ

0 голосов
/ 07 февраля 2019

Схема может быть передана динамически при инициализации класса Udaf -

    val yetAnotherUdaf = new YetAnotherUdaf(schema)

    case class YetAnotherUdaf(schema:StructType) extends UserDefinedAggregateFunction {

      override def deterministic:Boolean=true
      override def dataType:DataType=schema
      override def inputSchema:StructType=schema
      override def bufferSchema:StructType=schema

      override def initialize(buffer:MutableAggregationBuffer):Unit={ ??? }
      override def update(buffer:MutableAggregationBuffer, input:Row):Unit={ ??? }
      override def merge(buffer1:MutableAggregationBuffer, buffer2:Row):Unit={???}
      override def evaluate(buffer:Row):StructType={ ??? }
   }
...