Получить разделенный запятыми список соответствующих значений из указанных столбцов в столбце - PullRequest
0 голосов
/ 16 марта 2020

(Spark 2.3)
У меня есть искровой фрейм данных со столбцами (разделенными запятыми), указанными в одном из столбцов «атрибут».

+---------+---------+-----+-------+---------------+
| fname | lname | age | dept |  attributes        |
+---------+---------+-----+-------+---------------+
|  Jack  |  Felice |  25  |   IT   | fname,age    |
|  Mike  | Gilbert |  30  |   CS |lname,dept      |
|  John  |  Shen  |  45  |   DR | age,dept        |
+---------+---------+-----+-------+---------------+

Я пытаюсь получить соответствующие значения в списке через запятую в новом столбце вывода. Как этого добиться в scala?

Ожидаемый фрейм данных с выходным столбцом:

+---------+---------+-----+-------+---------------+----------------+
| fname | lname | age | dept |  attributes   |      output         |
+---------+---------+-----+-------+---------------+----------------+
|  Jack  |  Felice |  25  |   IT   | fname,age |    Jack,25        |
|  Mike  | Gilbert |  30  |   CS | lname,dept|  Gilbert,CS         |
|  John  |  Shen  |  45  |   DR | age,dept    |    45,DR           |
+---------+---------+-----+-------+---------------+----------------+

Ответы [ 2 ]

3 голосов
/ 16 марта 2020
scala> import org.apache.spark.sql.expressions.UserDefinedFunction
scala> import org.apache.spark.sql.Row

scala> df.show()
+-----+---------+---+----+----------+
|fname|    lname|age|dept|attributes|
+-----+---------+---+----+----------+
| Jack|  Felice | 25| IT | fname,age|
| Mike| Gilbert | 30| CS |lname,dept|
| John|     Shen| 45| DR |  age,dept|
+-----+---------+---+----+----------+


scala> def getValues:UserDefinedFunction = udf((R:Row) => {
     | val attrList = R.getAs("attributes").toString.split(",").toList
     | var out =  ""
     | attrList.foreach{ x =>
     | out = out + "," + R.getAs(x).toString
     | }
     | out.replaceFirst(s""",""","")
     | })

scala> df.withColumn("outPut", getValues(struct(df.columns map col: _*))).show()
+-----+---------+---+----+----------+-------------+
|fname|    lname|age|dept|attributes|       outPut|
+-----+---------+---+----+----------+-------------+
| Jack|  Felice | 25| IT | fname,age|      Jack,25|
| Mike| Gilbert | 30| CS |lname,dept| Gilbert ,CS |
| John|     Shen| 45| DR |  age,dept|       45,DR |
+-----+---------+---+----+----------+-------------+
3 голосов
/ 16 марта 2020

Это может быть сделано с использованием элементарных API, т.е. RDD. Выполните операцию отображения на существующем RDD строк, обновите строку и верните ее. преобразовать rdd [row] в Dataframe.

Вы можете попробовать последние API, т.е. Dataframe & Dataset.

import org.apache.spark.sql.types.{StructField, StructType, StringType}
val newSchema = StructType(df.schema.fields ++ Array(StructField("output", StringType, false)))

val rdd = df.rdd.map(row => {
  val attributes = row.getAs[String]("attributes")
  val k1 = row.getAs[String](attributes.split(",")(0))
  val k2 = row.getAs[String](attributes.split(",")(1))
  val output = s"$k1,$k2"
  val newRow = Row.fromSeq(row.toSeq ++ Array(output))
  newRow
})

val newDf = spark.sqlContext.createDataFrame(rdd, newSchema)
newDf.show(10)
//Output
+-----+-------+---+----+----------+----------+
|fname|  lname|age|dept|attributes|    output|
+-----+-------+---+----+----------+----------+
| Jack| Felice| 25|  IT| fname,age|   Jack,25|
| Mike|Gilbert| 30|  CS|lname,dept|Gilbert,CS|
| John|   Shen| 45|  DR|  age,dept|     45,DR|
+-----+-------+---+----+----------+----------+
...