Чтобы перебрать столбцы в Spark Dataframe, созданном из таблицы Hive, и обновить все вхождения нужных значений столбцов, я попробовал следующий код.
import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
val a: DataFrame = spark.sql(s"select * from default.table_a")
val column_names: Array[String] = a.columns
val required_columns: Array[String] = column_names.filter(name => name.endsWith("_date"))
val func = udf((value: String) => { if if (value == "XXXX" || value == "WWWW" || value == "TTTT") "NULL" else value } )
val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
При выполнении кода в спарк-оболочке я получил следующую ошибку.
scala> val b = {for (column: String <- required_columns) { a.withColumn(column , func(a(column))) } a}
<console>:35: error: value a is not a member of org.apache.spark.sql.DataFrame
val b = {for (column: String <- required_column_list) { a.withColumn(column , isNull(a(column))) } a}
^
Также я попробовал следующее утверждение и не получил требуемый вывод.
val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }
Переменная b создается как блок вместо блока данных.
scala> val b = for (column: String <- required_columns) { a.withColumn(column , func(a(column))) }
b: Unit = ()
Пожалуйста, предложите любой лучший способ перебрать столбцы Dataframe и обновить все вхождения значений из столбцов или исправить, где я ошибаюсь. Любое другое решение также приветствуется. Заранее спасибо.