Учитывая, что у вас уже есть функция, которая возвращает адрес в виде карты, вы можете создать UDF, который преобразует эту карту в структуру, а затем выбрать все поля карты:
import org.apache.spark.sql.functions.*
// For demo only
def getAddress(id: Int): Option[Map[String, String]] = {
id match {
case 1 => Some(Map("apt_no" -> "12", "street" -> "Main Street", "city" -> "NY", "zip" -> "1234"))
case 2 => Some(Map("apt_no" -> "1", "street" -> "Back Street", "city" -> "Gotham", "zip" -> "G123"))
case _ => None
}
}
case class Address(apt_no: String, street: String, city: String, zip: String)
def getAddressUdf = udf((id: Int) => {
getAddress(id) map (m =>
Address(m("apt_no"), m("street"), m("city"), m("zip"))
)
})
udf()
transformsфункции, которые возвращают экземпляры класса case в UDF, которые возвращают столбцы структуры с соответствующей схемой. Option[_]
возвращаемые типы автоматически преобразуются в null
-подобные типы данных. Затем поля столбца структуры можно расширить на несколько столбцов, выбрав их с помощью $"struct_col_name.*"
:
scala> val df = Seq(Person(1, "John", 32), Person(2, "Cloe", 27), Person(3, "Pete", 55)).toDS()
df: org.apache.spark.sql.Dataset[Person] = [id: int, name: string ... 1 more field]
scala> df.show()
+---+----+---+
| id|name|age|
+---+----+---+
| 1|John| 32|
| 2|Cloe| 27|
| 3|Pete| 55|
+---+----+---+
scala> df
| .withColumn("addr", getAddressUdf($"id"))
| .select($"id", $"name", $"age", $"addr.*")
| .show()
+---+----+---+------+------------+------+-----+
| id|name|age|apt_no| street| city| zip|
+---+----+---+------+------------+------+-----+
| 1|John| 32| 12| Main Street| NY| 1234|
| 2|Cloe| 27| 1| Back Street|Gotham| G123|
| 3|Pete| 55| null| null| null| null|
+---+----+---+------+------------+------+-----+