Использование класса case для переименования разделенных столбцов с помощью Spark Dataframe - PullRequest
0 голосов
/ 06 мая 2018

Я разделяю «split_column» на еще пять столбцов в соответствии со следующим кодом. Однако я хотел, чтобы эти новые столбцы были переименованы, чтобы у них были некоторые значимые имена (скажем, new_renamed1 "," new_renamed2 "," new_renamed3 "," new_renamed4 "," new_renamed5 "в этом примере)

val df1 = df.withColumn("new_column", split(col("split_column"), "\\|")).select(col("*") +: (0 until 5).map(i => col("new_column").getItem(i).as(s"newcol$i")): _*).drop("split_column","new_column")

val new_columns_renamed = Seq("....., "new_renamed1", "new_renamed2", "new_renamed3", "new_renamed4", "new_renamed5") 

val df2 = df1.toDF(new_columns_renamed: _*)

Однако проблема с этим подходом состоит в том, что некоторые из моих разделений могут иметь более пятидесяти новых строк. При таком подходе переименования было бы больно обнаруживать небольшую опечатку (например, лишнюю запятую, пропуская двойные кавычки).

Есть ли способ переименовать столбцы с помощью класса case, как показано ниже?

case class SplittedRecord (new_renamed1: String, new_renamed2: String, new_renamed3: String, new_renamed4: String, new_renamed5: String)

Обратите внимание, что в реальном сценарии имена не будут выглядеть как new_renamed1, new_renamed2, ......, new_renamed5, они будут совершенно другими.

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

Один из способов использовать кейс класса

case class SplittedRecord (new_renamed1: String, new_renamed2: String, new_renamed3: String, new_renamed4: String, new_renamed5: String)

через udf функцию как

import org.apache.spark.sql.functions._
def splitUdf = udf((array: Seq[String])=> SplittedRecord(array(0), array(1), array(2), array(3), array(4)))

df.withColumn("test", splitUdf(split(col("split_column"), "\\|"))).drop("split_column")
    .select(col("*"), col("test.*")).drop("test")
0 голосов
/ 06 мая 2018

Вы можете попробовать что-то вроде этого:

import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.Encoders
val names = Encoders.product[SplittedRecord].schema.fieldNames

names.zipWithIndex
  .foldLeft(df.withColumn("new_column", split(col("split_column"), "\\|")))
  { case (df, (c, i)) => df.withColumn(c, $"new_column"(i)) }
...