Взрывающаяся вложенная структура в кадре данных Spark, имеющем другую схему - PullRequest
0 голосов
/ 06 января 2020

У меня есть json, который имеет схему ниже:

 |-- Pool: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |    |    |-- 2: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)
 |    |-- 2: struct (nullable = true)
 |    |    |-- Alias: string (nullable = true)
 |    |    |-- Chaddr: string (nullable = true)
 |    |    |-- ChaddrMask: string (nullable = true)
 |    |    |-- Client: struct (nullable = true)
 |    |    |    |-- 1: struct (nullable = true)
 |    |    |    |    |-- Active: boolean (nullable = true)
 |    |    |    |    |-- Alias: string (nullable = true)
 |    |    |    |    |-- Chaddr: string (nullable = true)

И вывод, который я пытаюсь достичь:

 PoolId ClientID Client_Active
 1      1        true
 1      2        false
 2      1        true

Эта схема постоянно меняется с json. Например, на данный момент есть 2 идентификатора пула, может быть другой json, который будет иметь 5 идентификаторов пула, и то же самое с идентификатором CLient.

Проблема с:

  1. Мы не можем использовать Explode на struct.
  2. Пул не может быть преобразован в карту, поскольку каждый раз, когда у клиента разный идентификатор клиента, что приводит к разной схеме для каждой строки.

Есть мысли, как этого добиться?

Я попробовал эту ссылку для преобразования в Struct to Map и затем взрыва, но он не работает, когда в разных пулах разное количество идентификаторов клиентов.

1 Ответ

4 голосов
/ 06 января 2020

С моей точки зрения, вам нужно только определить UDF.

Вот пример:

  1. Определить класс случая проекции (что вы хотите в качестве результирующей структуры)
case class Projection(PoolId: String, ClientID: String, Client_Active: Boolean)
Определите UDF, как показано ниже, позволяя вам работать как со своей структурой (полями), так и с данными:
val myUdf = udf{r: Row =>
  r.schema.fields.flatMap{rf =>
    val poolId = rf.name
    val pool = r.getAs[Row](poolId)
    val clientRow = pool.getAs[Row]("Client")
    clientRow.schema.fields.map{cr =>
      val clientId = cr.name
      val isActive = clientRow.getAs[Row](clientId).getAs[Boolean]("Active")
      Projection(poolId, clientId, isActive)
    }
  }
}
Используй свой UDF:
val newDF = df.select(explode(myUdf($"Pool")).as("projection"))
    .select("projection.*")
    .cache

newDF.show(false)

Выход будет ожидаемым:

+------+--------+-------------+
|PoolId|ClientID|Client_Active|
+------+--------+-------------+
|1     |1       |true         |
|1     |2       |false        |
|2     |1       |true         |
+------+--------+-------------+
...