добавьте несколько столбцов к существующему фрейму - PullRequest
3 голосов
/ 09 апреля 2019

Мне нужно добавить несколько столбцов в существующий фрейм данных искры, где имена столбцов приведены в списке, предполагая, что значения для новых столбцов постоянны, например, для заданных входных столбцов и фрейма данных

val columnsNames=List("col1","col2")
val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4))

и после добавления обоихстолбцы, при условии, что постоянные значения равны «val1» для col1 и «val2» для col2, кадр выходных данных должен быть

+-----+---+-------+------+
|   _1| _2|col1   |col2|
+-----+---+-------+------+
|  one|  1|val1   |val2|
|  two|  2|val1   |val2|
|three|  3|val1   |val2|
| four|  4|val1   |val2|
+-----+---+-------+------+

я написал функцию для добавления столбцов

def appendColumns (cols: List[String], ds: DataFrame): DataFrame = {

            cols match {

                case Nil => ds
                case h :: Nil => appendColumns(Nil, ds.withColumn(h, lit(h)))
                case h :: tail => appendColumns(tail, ds.withColumn(h, lit(h)))

            }
        }

IsЕсть ли лучший способ и более функциональный способ сделать это.

спасибо

Ответы [ 2 ]

4 голосов
/ 09 апреля 2019

Да, есть лучший и более простой способ.По сути, вы делаете столько звонков на withColumn, сколько у вас есть столбцов.С большим количеством колонок, катализатором, двигатель, который оптимизирует запросы зажигания, может чувствовать себя немного перегруженным (у меня был опыт в прошлом с подобным вариантом использования).Я даже видел, как это вызывает OOM на драйвере при экспериментировании с тысячами столбцов.Чтобы избежать нагрузки на катализатор (и писать меньше кода ;-)), вы можете просто использовать select, как показано ниже, чтобы сделать это в одной команде spark:

val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF
// let's assume that we have a map that associates column names to their values
val columnMap = Map("col1" -> "val1", "col2" -> "val2")
// Let's create the new columns from the map
val newCols = columnMap.keys.map(k => lit(columnMap(k)) as k)
// selecting the old columns + the new ones
data.select(data.columns.map(col) ++ newCols : _*).show
+-----+---+----+----+
|   _1| _2|col1|col2|
+-----+---+----+----+
|  one|  1|val1|val2|
|  two|  2|val1|val2|
|three|  3|val1|val2|
| four|  4|val1|val2|
+-----+---+----+----+
3 голосов
/ 09 апреля 2019

В отличие от рекурсии, более общий подход с использованием foldLeft был бы более общим для ограниченного числа столбцов. Использование Блокнота данных:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import spark.implicits._

val columnNames = Seq("c3","c4")
val df = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("c1", "c2")

def addCols(df: DataFrame, columns: Seq[String]): DataFrame = {
    columns.foldLeft(df)((acc, col) => {
      acc.withColumn(col, lit(col)) })
}

val df2 = addCols(df, columnNames)
df2.show(false)

возвращается:

+-----+---+---+---+
|c1   |c2 |c3 |c4 |
+-----+---+---+---+
|one  |1  |c3 |c4 |
|two  |2  |c3 |c4 |
|three|3  |c3 |c4 |
|four |4  |c3 |c4 |
+-----+---+---+---+

Пожалуйста, остерегайтесь следующего: https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 хотя и в несколько ином контексте, а другой ответ на это ссылается через выборочный подход.

...