Как динамически добавлять столбцы в DataFrame? - PullRequest
1 голос
/ 20 января 2020

Я пытаюсь динамически добавить столбцы в DataFrame из Seq of String.

Вот пример: исходный dataframe имеет вид:

+-----+---+----+---+---+
|id | A | B  | C  | D  |
+-----+---+----+---+---+
|1 |toto|tata|titi|    |
|2 |bla |blo |   |     |
|3 |b   | c  |  a |  d |
+-----+---+----+---+---+

У меня также есть Seq Строка, которая содержит название столбцов, которые я хочу добавить. Если в исходном DataFrame столбец уже существует, он должен иметь некоторые различия, как показано ниже:

Seq выглядит следующим образом:

val columns = Seq("A", "B", "F", "G", "H")

Ожидаемое значение:

+-----+---+----+---+---+---+---+---+
|id | A | B  | C  | D  | F | G | H |
+-----+---+----+---+---+---+---+---+
|1 |toto|tata|titi|tutu|null|null|null
|2 |bla |blo |   |     |null|null|null|
|3 |b   | c  |  a |  d |null|null|null|
+-----+---+----+---+---+---+---+---+

То, что я сделал до сих пор, выглядит примерно так:

val difference = columns diff sourceDF.columns
val finalDF = difference.foldLeft(sourceDF)((df, field) => if (!sourceDF.columns.contains(field)) df.withColumn(field, lit(null))) else df)
  .select(columns.head, columns.tail:_*) 

Но я не могу понять, как сделать это с помощью Spark более эффективно, чтобы было проще и проще читать ...

Заранее спасибо

Ответы [ 2 ]

2 голосов
/ 22 января 2020

Вот еще один способ использования Seq.diff, одиночных select и map для генерации вашего окончательного списка столбцов:

import org.apache.spark.sql.functions.{lit, col}


val newCols = Seq("A", "B", "F", "G", "H")

val updatedCols = newCols.diff(df.columns).map{ c => lit(null).as(c)}

val selectExpr = df.columns.map(col) ++ updatedCols

df.select(selectExpr:_*).show

// +---+----+----+----+----+----+----+----+
// | id|   A|   B|   C|   D|   F|   G|   H|
// +---+----+----+----+----+----+----+----+
// |  1|toto|tata|titi|null|null|null|null|
// |  2| bla| blo|null|null|null|null|null|
// |  3|   b|   c|   a|   d|null|null|null|
// +---+----+----+----+----+----+----+----+

Сначала мы находим различие между newCols и df.columns, это дает нам : F, G, H. Затем мы преобразуем каждый элемент списка в lit(null).as(c) с помощью функции map. Наконец, мы объединяем существующий и новый список вместе, чтобы получить selectExpr, который используется для select.

1 голос
/ 20 января 2020

Ниже будет оптимизирован ваш лог c.

scala> df.show
+---+----+----+----+----+
| id|   A|   B|   C|   D|
+---+----+----+----+----+
|  1|toto|tata|titi|null|
|  2| bla| blo|null|null|
|  3|   b|   c|   a|   d|
+---+----+----+----+----+

scala> val Columns  = Seq("A", "B", "F", "G", "H")

scala> val newCol =  Columns filterNot df.columns.toSeq.contains

scala> val df1 =  newCol.foldLeft(df)((df,name) => df.withColumn(name, lit(null)))
scala> df1.show()
+---+----+----+----+----+----+----+----+
| id|   A|   B|   C|   D|   F|   G|   H|
+---+----+----+----+----+----+----+----+
|  1|toto|tata|titi|null|null|null|null|
|  2| bla| blo|null|null|null|null|null|
|  3|   b|   c|   a|   d|null|null|null|
+---+----+----+----+----+----+----+----+

Если вы не хотите использовать foldLeft, вы можете использовать RunTimeMirror, который будет быстрее. Проверьте ниже код.

scala> import scala.reflect.runtime.universe.runtimeMirror
scala> import scala.tools.reflect.ToolBox
scala> import org.apache.spark.sql.DataFrame

scala> df.show
+---+----+----+----+----+
| id|   A|   B|   C|   D|
+---+----+----+----+----+
|  1|toto|tata|titi|null|
|  2| bla| blo|null|null|
|  3|   b|   c|   a|   d|
+---+----+----+----+----+


scala> def compile[A](code: String): DataFrame => A = {
     |     val tb = runtimeMirror(getClass.getClassLoader).mkToolBox()
     |     val tree = tb.parse(
     |       s"""
     |          |import org.elasticsearch.spark.sql._
     |          |import org.apache.spark.sql.DataFrame
     |          |def wrapper(context:DataFrame): Any = {
     |          |  $code
     |          |}
     |          |wrapper _
     |       """.stripMargin)
     | 
     |     val fun = tb.compile(tree)
     |     val wrapper = fun()
     |     wrapper.asInstanceOf[DataFrame => A]
     |   }


scala> def  AddColumns(df:DataFrame,withColumnsString:String):DataFrame = {
     |     val code =
     |       s"""
     |          |import org.apache.spark.sql.functions._
     |          |import org.elasticsearch.spark.sql._
     |          |import org.apache.spark.sql.DataFrame
     |          |var data = context.asInstanceOf[DataFrame]
     |          |data = data
     |       """ + withColumnsString +
     |         """
     |           |
     |           |data
     |         """.stripMargin
     | 
     |     val fun = compile[DataFrame](code) 
     |     val res = fun(df)
     |     res
     |   }


scala> val Columns = Seq("A", "B", "F", "G", "H")     
scala> val newCol =  Columns filterNot df.columns.toSeq.contains

scala> var cols = ""      
scala>  newCol.foreach{ name =>
     |  cols = ".withColumn(\""+ name + "\" , lit(null))" + cols
     | }

scala> val df1 = AddColumns(df,cols)
scala> df1.show
+---+----+----+----+----+----+----+----+
| id|   A|   B|   C|   D|   H|   G|   F|
+---+----+----+----+----+----+----+----+
|  1|toto|tata|titi|null|null|null|null|
|  2| bla| blo|null|null|null|null|null|
|  3|   b|   c|   a|   d|null|null|null|
+---+----+----+----+----+----+----+----+
...