Apache Spark перебирает столбцы DataFrame и применяет преобразование значений - PullRequest
0 голосов
/ 08 ноября 2018

Я читаю csv-файл в Spark DataFrame и определяю имена столбцов на основе заголовка cvs-файла:

val df = spark.read
  .format("org.apache.spark.csv")
  .option("header", true)
  .option("inferSchema", true)
  .csv("users.csv")

прямо сейчас мне нужно преобразовать значения столбцов, например:

val modifedDf1 = df.withColumn("country", when(col("country") === "Italy", "[ITALY]").otherwise(col("country")))

val modifedDf2 = modifedDf1.withColumn("city", when(col("city") === "Milan", "[MILAN]").otherwise(col("city")))

Как видите, для изменения значения столбца мне нужно явно выбрать столбец withColumn("city".., а затем применить условие.

Сейчас мне нужно повторить этот код для каждого столбца, который я хочу изменить.

Можно ли переписать этот код для итерации каждого столбца в df DataFrame и применить следующее (в псевдокоде):

df.foreachColumn {
    if (col_name == 'country')) 
        then when(col_value === "Italy", "[ITALY]").otherwise(col_value)
    else if (col_name == 'city')) 
        then when(col_value === "Milan", "[MILAN]").otherwise(col_value)
}

Я буду признателен за пример в Scala.

ОБНОВЛЕНО

Это мой оригинальный df:

+------+------------------+--------------+-------------+
|name  |email             |phone         |country      |
+------+------------------+--------------+-------------+
|Mike  |mike@example.com  |+91-9999999999|Italy        |
|Alex  |alex@example.com  |+91-9999999998|France       |
|John  |john@example.com  |+1-1111111111 |United States|
|Donald|donald@example.com|+1-2222222222 |United States|
+------+------------------+--------------+-------------+

У меня сейчас есть следующий код:

val columnsModify = df.columns.map(col).map(column => {
  val columnName = s"${column}"
  if (columnName == "country") {
    column as "[COUNTRY]"
  } else if (columnName == "email") {
    column as "(EMAIL)"
  } else {
    column as columnName
  }
})

, который может перебирать столбцы DataFrame и изменять их имена в соответствии с заданными условиями.

Это вывод:

+------+------------------+--------------+-------------+
|name  |(EMAIL)           |phone         |[COUNTRY]    |
+------+------------------+--------------+-------------+
|Mike  |mike@example.com  |+91-9999999999|Italy        |
|Alex  |alex@example.com  |+91-9999999998|France       |
|John  |john@example.com  |+1-1111111111 |United States|
|Donald|donald@example.com|+1-2222222222 |United States|
+------+------------------+--------------+-------------+

Мне также нужно добавить логику преобразования для значений столбцов, что-то вроде этого (см. Прокомментированную строку ниже):

val columnsModify = df.columns.map(col).map(column => {
  val columnName = s"${column}"
  if (columnName == "country") {
    //when(column_value === "Italy", "[ITALY]").otherwise(column_value)
    column as "[COUNTRY]"
  } else if (columnName == "email") {
    column as "(EMAL)"
  } else {
    column as columnName
  }
})

и ожидаемый результат для этого скрипта должен быть:

+------+------------------+--------------+-------------+
|name  |(EMAL)            |phone         |[COUNTRY]    |
+------+------------------+--------------+-------------+
|Mike  |mike@example.com  |+91-9999999999|[ITALY]      |
|Alex  |alex@example.com  |+91-9999999998|France       |
|John  |john@example.com  |+1-1111111111 |United States|
|Donald|donald@example.com|+1-2222222222 |United States|
+------+------------------+--------------+-------------+

Пожалуйста, покажите, как этого достичь.

Ответы [ 2 ]

0 голосов
/ 08 ноября 2018

Как насчет использования df.selectExpr

scala> :paste
// Entering paste mode (ctrl-D to finish)

 val sel2 = df.columns.map( x =>
 if(x=="country") "CASE WHEN country = 'Italy' THEN '[ITALY]' ELSE country  end as `[country]` "
 else if(x=="email") " email as `(EMAL)`"
 else x
 )

// Exiting paste mode, now interpreting.

sel2: Array[String] = Array(name, " email as `(EMAL)`", phone, "CASE WHEN country = 'Italy' THEN '[ITALY]' ELSE country  end as `[country]` ")

scala>  df.selectExpr(sel2:_*).show
+------+------------------+--------------+-------------+
|  name|            (EMAL)|         phone|    [country]|
+------+------------------+--------------+-------------+
|  Mike|  mike@example.com|+91-9999999999|      [ITALY]|
|  Alex|  alex@example.com|+91-9999999998|       France|
|  John|  john@example.com| +1-1111111111|United States|
|Donald|donald@example.com| +1-2222222222|United States|
+------+------------------+--------------+-------------+


scala>
0 голосов
/ 08 ноября 2018
val newCols = df.schema.map{
  column =>

    val colName = column.name

    colName match{
      case "country" => when(col(colName) === "Italy", "ITALY").otherwise(col(colName)).as("[COUNTRY]") 
      case "email" => col(colName).as("[EMAIL]")
      case _ => col(colName) 
    } 
}

df.select(newCols.head, newCols.tail: _*)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...