Spark -Scala Вложенный массив DF - Как обновить значение на основе условия без изменения структуры? - PullRequest
1 голос
/ 20 сентября 2019

У меня есть структура, подобная следующей в формате orc / parquet.

{
  "Register": {
    "Persons": [
      {
        "Name": "Name1",
        "Age": 12,
        "Address": [
          {
            "Apt": "Apt1"
          }
        ],
        "Phone": [
          {
            "PhoneNum": 1234
          }
        ]
      },
      {
        "Name": "Name2",
        "Age": 14,
        "Address": [
          {
            "Apt": "Apt2"
          }
        ],
        "Phone": [
          {
            "PhoneNum": 55555
          }
        ]
      }

    ]
  }
}

Мне нужно создать новый DF на основе условия Apt = Apt1 и изменить номер телефона этой записи на 7777Н.Б .: Нужно сохранить ту же структуру.Я опробовал несколько методов в scala-spark, но не смог обновить тип структуры вложенного массива.Любой совет эксперта будет полезен.

Обновление: по этой ссылке я могу получить переменные named_struct.Когда дело доходит до массива, я не могу получить ответ.https://kb.databricks.com/data/update-nested-column.html#how-to-update-nested-columns

Ответы [ 2 ]

0 голосов
/ 24 сентября 2019

Идея состоит в том, чтобы использовать case-классы , чтобы преобразовать вложенную структуру в набор простых классов Scala, которые проще обрабатывать - или в терминах Spark: использовать (типизированный) Dataset вместо нетипизированного DataFrame.

case class Phone(var PhoneNum:String)
case class Apt(Apt:String)
case class Person(Name: String, Age: Long, Address:Array[Apt], Phone:Array[Phone])
case class Register(Persons:Array[Person])
case class TopLevel(Register:Register)

Преобразование кадра данных в набор данных и затем применение вызова map к каждой записи набора данных:

val df = ...
val ds = df.as[TopLevel]
val transformed = ds.map(tl => {
  for( p <- tl.Register.Persons) {
    if(p.Address.contains(Apt("Apt1"))) p.Phone.transform(_ => Phone("7777"))
  }
  tl
})
transformed.toJSON.show(false)

отпечатков:

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"Register":{"Persons":[{"Name":"Name1","Age":12,"Address":[{"Apt":"Apt1"}],"Phone":[{"PhoneNum":"7777"}]},{"Name":"Name2","Age":14,"Address":[{"Apt":"Apt2"}],"Phone":[{"PhoneNum":"55555"}]}]}}|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Замечание по структуре / схеме данных в вопросе :

В ответ на вопрос используется рамка данных регистров.Это означает, что каждая запись в кадре данных содержит один регистр.Было бы более интуитивно понятно, если бы информационный блок содержал список лиц и этот список людей назывался «Регистр».Это привело бы к гораздо более легкой структуре данных.В этом случае классы TopLevel и Register могут быть опущены.

0 голосов
/ 21 сентября 2019

Первым шагом является сопоставление вашего json в dataframe. Затем мы создаем пользовательский UDF, который принимает на вход столбец Apt, столбец PhoneNum и новый номер телефона, позволяющий изменить номер телефона, если Apt = Apt1

  def main(args: Array[String]): Unit = {


    val inputJson = "{\"Register\":{\"Persons\":[{\"Name\":\"Name1\",\"Age\":12,\"Address\":[{\"Apt\":\"Apt1\"}],\"Phone\":[{\"PhoneNum\":1234}]},{\"Name\":\"Name2\",\"Age\":14,\"Address\":[{\"Apt\":\"Apt2\"}],\"Phone\":[{\"PhoneNum\":55555}]}]}}"


    import sparkSession.implicits._

    val outputDataFrame = sparkSession.read.option("multiline", true).option("mode","PERMISSIVE")
      .json(Seq(inputJson).toDS)
      .select(
          // First layer mapping
          col("Register").getItem("Persons").as("Persons")
        )
      .withColumn("Persons", explode(col("Persons")))
        .select(
          // Second layer mapping
          col("Persons").getItem("Name").as("Name"),
          col("Persons").getItem("Age").as("Age"),
          col("Persons").getItem("Address").as("Address"),
          col("Persons").getItem("Phone").as("Phone")
        )
        .select(col("Name"),col("Age"),
          // last layer mapping
          col("Address").getItem("Apt").as("Apt"),
          col("Phone").getItem("PhoneNum").as("PhoneNum"))
        .withColumn("Apt", explode(col("Apt")))
        .withColumn("PhoneNum", explode(col("PhoneNum")))
        .withColumn("PhoneNum", changePhoneNumUDF(col("Apt"), col("PhoneNum"), lit(987654))) // apply user defined function to change PhoneNume according to Apt

    outputDataFrame.show


  }
  def changePhoneNum(Apt : String, oldPhoneNum : Long ,NewPhoneNum : Long) : Long = Apt match {
    case "Apt1" => NewPhoneNum
    case _ => oldPhoneNum
  }
  val changePhoneNumUDF = udf(changePhoneNum _)
}

Выход:

+-----+---+----+--------+
| Name|Age| Apt|PhoneNum|
+-----+---+----+--------+
|Name1| 12|Apt1|  987654|
|Name2| 14|Apt2|   55555|
+-----+---+----+--------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...