Как добавить новые столбцы и соответствующие значения строки для фрейма данных spark? - PullRequest
0 голосов
/ 01 ноября 2019

Я новичок в мире скала / искра. Изо всех сил, чтобы найти решение варианта использования. Было бы здорово, если бы вы могли помочь.

У меня есть набор искровых данных (df с классом дел) с именем person.

scala> val person_with_contact = person.map(r => (
     | r.id,
     | r.name,
     | r.age
     | )).toDF()

Теперь я хочу добавить список атрибутов адреса (например, apt_no, улица, город, почтовый индекс) для каждой записи этого набора данных. Получить список атрибутов адреса, у меня есть функция, которая принимает идентификатор человека в качестве входных данных и возвращает карту, которая содержит все атрибуты адреса и их соответствующие значения.

Я попробовал следующее и несколько других SO, предложенных подходов, но пока не смог решить. (Ссылка - статический столбец ex - Spark, добавить новый столбец с тем же значением в Scala )

scala> val person_with_contact = person.map(r => (
    | r.id,
    | r.name,
    | r.age,
    | getAddress(r.id) 
    | )).toDF()

Конечный кадр данных должен иметь следующие столбцы.

id, name, age, apt_no, street, city, zip

Можете ли вы помочь?

Ответы [ 2 ]

1 голос
/ 01 ноября 2019

используйте udf

package yourpackage

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


object MainDemo {

  def getAddress(id: Int): String = {
    //do your things
    "address id:" + id
  }

  def getCity(id: String): String = {
    //do your things
    "your city :" + id
  }

  def getZip(id: String): String = {
    //do your things
    "your zip :" + id
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[3]").enableHiveSupport().getOrCreate()
    val person = Seq(Person(1, "name_m", 21), Person(2, "name_w", 40))
    import spark.implicits._
    val person_with_contact = person.map(r => (r.id, r.name, r.age, getAddress(r.id))).toDF("id", "name", "age", "street")
    person_with_contact.printSchema()
    //root
    // |-- id: integer (nullable = false)
    // |-- name: string (nullable = true)
    // |-- age: integer (nullable = false)
    // |-- street: string (nullable = true)
    val result = person_with_contact.select(
      col("id"),
      col("name"),
      col("age"),
      col("street"),
      udf { id: String =>
        val city = getCity(id)
        city
      }.apply(col("id")).as("city"),
      udf { id: String =>
        val city = getZip(id)
        city
      }.apply(col("id")).as("zip")
    )
    result.printSchema()
    //root
    // |-- id: integer (nullable = false)
    // |-- name: string (nullable = true)
    // |-- age: integer (nullable = false)
    // |-- street: string (nullable = true)
    // |-- city: string (nullable = true)
    // |-- zip: string (nullable = true)
    result.show()
    //+---+------+---+------------+------------+-----------+
    //| id|  name|age|      street|        city|        zip|
    //+---+------+---+------------+------------+-----------+
    //|  1|name_m| 21|address id:1|your city :1|your zip :1|
    //|  2|name_w| 40|address id:2|your city :2|your zip :2|
    //+---+------+---+------------+------------+-----------+
  }
}

0 голосов
/ 01 ноября 2019

Учитывая, что у вас уже есть функция, которая возвращает адрес в виде карты, вы можете создать UDF, который преобразует эту карту в структуру, а затем выбрать все поля карты:

import org.apache.spark.sql.functions.*

// For demo only
def getAddress(id: Int): Option[Map[String, String]] = {
  id match {
    case 1 => Some(Map("apt_no" -> "12", "street" -> "Main Street", "city" -> "NY", "zip" -> "1234"))
    case 2 => Some(Map("apt_no" -> "1", "street" -> "Back Street", "city" -> "Gotham", "zip" -> "G123"))
    case _ => None
  }
}

case class Address(apt_no: String, street: String, city: String, zip: String)

def getAddressUdf = udf((id: Int) => {
  getAddress(id) map (m =>
    Address(m("apt_no"), m("street"), m("city"), m("zip"))
  )
})

udf() transformsфункции, которые возвращают экземпляры класса case в UDF, которые возвращают столбцы структуры с соответствующей схемой. Option[_] возвращаемые типы автоматически преобразуются в null -подобные типы данных. Затем поля столбца структуры можно расширить на несколько столбцов, выбрав их с помощью $"struct_col_name.*":

scala> val df = Seq(Person(1, "John", 32), Person(2, "Cloe", 27), Person(3, "Pete", 55)).toDS()
df: org.apache.spark.sql.Dataset[Person] = [id: int, name: string ... 1 more field]

scala> df.show()
+---+----+---+
| id|name|age|
+---+----+---+
|  1|John| 32|
|  2|Cloe| 27|
|  3|Pete| 55|
+---+----+---+

scala> df
     | .withColumn("addr", getAddressUdf($"id"))
     | .select($"id", $"name", $"age", $"addr.*")
     | .show()
+---+----+---+------+------------+------+-----+
| id|name|age|apt_no|      street|  city|  zip|
+---+----+---+------+------------+------+-----+
|  1|John| 32|    12| Main Street|    NY| 1234|
|  2|Cloe| 27|     1| Back Street|Gotham| G123|
|  3|Pete| 55|  null|        null|  null| null|
+---+----+---+------+------------+------+-----+
...