Как изменить заголовок фрейма данных на основе конкретного значения? - PullRequest
1 голос
/ 22 января 2020

Предположим, у нас есть следующий CSV-файл, например

fname,age,class,dob

Второе имя CSV-файла, например

f_name,user_age,class,DataofBith

Я пытаюсь сделать его общим заголовком для всех CSV-файлов, которые возвращают один и тот же кадр данных заголовка, как в стандартном порядке, как

first_name,age,class,dob
val df2 = df.withColumnRenamed("DateOfBirth","DateOfBirth").withColumnRenamed("fname","name")
df2.printSchema()

Но этот способ не является обобщенным c. Можем ли мы сделать это динамически c для всех CSV согласно стандартному преобразованию, подобному DataFrame из CSV в fname,f_name, его следует преобразовать в имя?

Ответы [ 3 ]

2 голосов
/ 22 января 2020

Вы можете использовать Список схем, затем Итерировать поверх схемы, как показано ниже -

Approach :1
val df= Seq((1,"goutam","kumar"),(2,"xyz","kumar")).toDF("id","fname","lname")
val schema=Seq("id"->"sid","fname"->"sfname","lname"->"slname")
val mapedSchema = schema.map(x=>df(x._1).as(x._2))
df.select(mapedSchema :_*)

при чтении CSV дают "option("header", false)", после чего вы можете прочитать о сопоставлении старой схемы с новой схемой.

Approach :2
val schema=Seq("sid","sfname","slname")
val mapedSchema=data.columns.zip(schema)
val mapedSchemaWithDF = mapedSchema.map(x=>df(x._1).as(x._2))
df.select(mapedSchemaWithDF:_*)
1 голос
/ 23 января 2020

Вы можете использовать простой select в сочетании с Scala Map. Проще обрабатывать преобразования столбцов через словарь (карта), ключ которого будет старое имя и значение новое имя .

Позволяет сначала создать два наборы данных, как вы их описали:

val df1 = Seq(
  ("toto", 23, "g", "2010-06-09"),
  ("bla", 35, "s", "1990-10-01"),
  ("pino", 12, "a", "1995-10-05")
).toDF("fname", "age", "class", "dob")

val df2 = Seq(
  ("toto", 23, "g", "2010-06-09"),
  ("bla", 35, "s", "1990-10-01"),
  ("pino", 12, "a", "1995-10-05")
).toDF("f_name", "user_age", "class", "DataofBith")

Затем мы создали функцию Scala с именем transform, которая принимает два аргумента, цель df и mapping, которая содержит детали преобразований:


val mapping = Map(
  "fname" -> "first_name",
  "f_name" -> "first_name",
  "user_age" -> "age",
  "DataofBith" -> "dob"
)

def transform(df: DataFrame, mapping: Map[String, String]) : DataFrame = {
  val keys = mapping.keySet
  val cols = df.columns.map{c => 
    if(keys.contains(c))
      df(c).as(mapping(c))
    else
      df(c)
  }

  df.select(cols:_*)
}

Функция просматривает указанные столбцы, сначала проверяя, существует ли текущий столбец в mapping. Если это так, он переименовывает, используя соответствующее значение из словаря, в противном случае столбец остается нетронутым. Обратите внимание, что это просто переименует столбец (через псевдоним), поэтому мы не ожидаем, что это повлияет на производительность.

Наконец, несколько примеров:

val newDF1 = transform(df1, mapping)
newDF1.show

// +----------+---+-----+----------+
// |first_name|age|class|       dob|
// +----------+---+-----+----------+
// |      toto| 23|    g|2010-06-09|
// |       bla| 35|    s|1990-10-01|
// |      pino| 12|    a|1995-10-05|
// +----------+---+-----+----------+


val newDF2 = transform(df2, mapping)
newDF2.show

// +----------+---+-----+----------+
// |first_name|age|class|       dob|
// +----------+---+-----+----------+
// |      toto| 23|    g|2010-06-09|
// |       bla| 35|    s|1990-10-01|
// |      pino| 12|    a|1995-10-05|
// +----------+---+-----+----------+
1 голос
/ 22 января 2020

Функция withColumnRenamed работает также, если столбец отсутствует в кадре данных. Следовательно, вы можете go впереди, прочитать все кадры данных и применить одинаковые логики переименования c везде и объединить их все позже.

import org.apache.spark.sql.DataFrame

def renaming(df: DataFrame): DataFrame = {
   df.withColumnRenamed("dob", "DateOfBirth")
     .withColumnRenamed("fname", "name")
     .withColumnRenamed("f_name", "name")
     .withColumnRenamed("user_age", "age")
 // append more renaming functions here
}

val df1 = renaming(spark.read.csv("...your path here"))

val df2 = renaming(spark.read.csv("...another path here"))

val result = df1.unionAll(df2)

result будет иметь одинаковую схему (DateOfBirth, name, age) в этом случае .

Редактировать:

После вашего ввода, если я правильно понимаю, что вы должны делать, что по этому поводу?

val df1 = spark.read.csv("...your path here").toDF("name", "age", "class", "born_date")

val df2 = spark.read.csv("...another path here").toDF("name", "age", "class", "born_date")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...