Как запустить udf для каждого столбца в кадре данных? - PullRequest
0 голосов
/ 06 сентября 2018

У меня есть UDF:

val TrimText = (s: AnyRef) => {
    //does logic returns string
}

И датафрейм:

var df = spark.read.option("sep", ",").option("header", "true").csv(root_path + "/" + file)

Я хотел бы выполнить TrimText для каждого значения в каждом столбце в кадре данных.

Однако проблема в том, что у меня динамическое количество столбцов. Я знаю, что могу получить список столбцов по df.columns. Но я не уверен, как это поможет мне с моей проблемой. Как я могу решить эту проблему?

TLDR Issue - выполнение UDF для каждого столбца в кадре данных, когда в кадре данных есть неизвестное количество столбцов


Попытка использования:

df.columns.foldLeft( df )( (accDF, c) =>
  accDF.withColumn(c, TrimText(col(c)))
)

Выдает эту ошибку:

error: type mismatch;
 found   : String
 required: org.apache.spark.sql.Column
accDF.withColumn(c, TrimText(col(c)))

TrimText предполагается, что возвращает строку, и ожидает, что ввод будет значением в столбце. Таким образом, он будет стандартизировать каждое значение в каждой строке всего фрейма данных.

Ответы [ 3 ]

0 голосов
/ 06 сентября 2018
val a = sc.parallelize(Seq(("1 "," 2"),(" 3","4"))).toDF()
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
def TrimText(s: Column): Column = {
//does logic returns string
  trim(s)
}
a.select(a.columns.map(c => TrimText(col(c))):_*).show
0 голосов
/ 07 сентября 2018
>> I would like to perform TrimText on every value in every column in the dataframe.
>> I have a dynamic number of columns.

когда доступна функция sql для обрезки, почему UDF, может увидеть ниже подходящий вам код?

import org.apache.spark.sql.functions._

spark.udf.register("TrimText", (x:String) =>  ..... )

val df2 = sc.parallelize(List(
  (26, true, 60000.00),
  (32, false, 35000.00)
)).toDF("age", "education", "income")

val cols2 = df2.columns.toSet
df2.createOrReplaceTempView("table1")

val query = "select " + buildcolumnlst(cols2) + " from table1 "
println(query)
val dfresult = spark.sql(query)
dfresult.show()

def buildcolumnlst(myCols: Set[String]) = {
  myCols.map(x => "TrimText(" + x + ")" + " as " + x).mkString(",") 
}

Результаты

select trim(age) as age,trim(education) as education,trim(income) as income from table1 
+---+---------+-------+
|age|education| income|
+---+---------+-------+
| 26|     true|60000.0|
| 32|    false|35000.0|
+---+---------+-------+
0 голосов
/ 06 сентября 2018

Вы можете использовать foldLeft, чтобы просмотреть список столбцов, чтобы итеративно применить withColumn к DataFrame, используя ваш UDF:

df.columns.foldLeft( df )( (accDF, c) =>
  accDF.withColumn(c, TrimText(col(c)))
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...