Выполните итерацию по столбцам в Spark Dataframe и обновите указанные значения - PullRequest
0 голосов
/ 06 мая 2018

Чтобы перебрать столбцы в Spark Dataframe, созданном из таблицы Hive, и обновить все вхождения нужных значений столбцов, я попробовал следующий код.

import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf

val a: DataFrame = spark.sql(s"select * from default.table_a")

    val column_names: Array[String] = a.columns

    val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date")) 

    val func = udf((value: String) => { if if (value == "XXXX" || value == "WWWW" || value == "TTTT") "NULL" else value } )

    val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}

При выполнении кода в спарк-оболочке я получил следующую ошибку.

scala> val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
<console>:35: error: value a is not a member of org.apache.spark.sql.DataFrame
       val b = {for (column: String <- required_column_list) { a.withColumn(column , isNull(a(column))) } a}
                                                                                                          ^ 

Также я попробовал следующее утверждение и не получил требуемый вывод.

val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }

Переменная b создается как блок вместо блока данных.

scala> val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }
    b: Unit = ()

Пожалуйста, предложите любой лучший способ перебрать столбцы Dataframe и обновить все вхождения значений из столбцов или исправить, где я ошибаюсь. Любое другое решение также приветствуется. Заранее спасибо.

1 Ответ

0 голосов
/ 06 мая 2018

Вместо для цикла , вы должны пойти с foldLeft. И вам не нужна udf функция, when встроенная функция можно использовать

val column_names: Array[String] = a.columns

val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date"))

import org.apache.spark.sql.functions._
val b = required_columns.foldLeft(a){(tempdf, colName) => tempdf.withColumn(colName, when(col(colName) === "XXX" || col(colName) === "WWWW" || col(colName) === "TTTT", "NULL").otherwise(col(colName)))}

Надеюсь, ответ полезен

Пояснение:

В
required_columns.foldLeft(a){(tempdf, colName) => tempdf.withColumn(colName, when(col(colName) === "XXX" || col(colName) === "WWWW" || col(colName) === "TTTT", "NULL").otherwise(col(colName)))}

required_columns - это массив имен столбцов из a набора данных / набора данных с _date в качестве конечной строки, которые colName внутри withColumn

tempdf - исходный фрейм данных / набор данных, т.е. a

, когда внутри withColumn применяется функция , которая заменяет все значения XXX или WWWWW или TTTT на NULL

finally foldLeft возвращает все примененные преобразования данных в в b

...