Добавление заголовка и информации о трейлере stati c в спарк - PullRequest
0 голосов
/ 25 марта 2020

У меня есть датафрейм, который записывается в расположение выходной папки с разделителем файлов в качестве разделителя канала. Перед написанием мне нужно добавить заголовок и трейлер в существующий фрейм данных.

Фактическая полезная нагрузка:

+--------------------+---+---+---+---+----+----+----------+---+
|                  _1| _2| _3| _4| _5|  _6|  _7|        _8| _9|
+--------------------+---+---+---+---+----+----+----------+---+
|chevrolet chevell...| 18|  8|307|130|3504|12.0|1970-01-01|USA|
|   buick skylark 320| 15|  8|350|165|3693|11.5|1970-01-01|USA|
|  plymouth satellite| 18|  8|318|150|3436|11.0|1970-01-01|USA|
|       amc rebel sst| 16|  8|304|150|3433|12.0|1970-01-01|USA|
|         ford torino| 17|  8|302|140|3449|10.5|1970-01-01|USA|
|    ford galaxie 500| 15|  8|429|198|4341|10.0|1970-01-01|USA|
|    chevrolet impala| 14|  8|454|220|4354| 9.0|1970-01-01|USA|
|   plymouth fury iii| 14|  8|440|215|4312| 8.5|1970-01-01|USA|
|    pontiac catalina| 14|  8|455|225|4425|10.0|1970-01-01|USA|
|  amc ambassador dpl| 15|  8|390|190|3850| 8.5|1970-01-01|USA|
+--------------------+---+---+---+---+----+----+----------+---+

Заголовок

+-------+----------+-------+---+
|     _1|        _2|     _3| _4|
+-------+----------+-------+---+
|Samsung|Galaxy S10|Android| 12|
+-------+----------+-------+---+

Нижний колонтитул:

+----+---+----------+---+
|  _1| _2|        _3| _4|
+----+---+----------+---+
|alex| 25|California| US|
+----+---+----------+---+

Не обязательно размер столбцов в полезной нагрузке равен размеру столбцов для верхнего и нижнего колонтитула. Я преобразовал все кадры данных в rdd как следующие

val payloadRDD = payload.rdd
val headerRDD = header.rdd 
val trailerRDD = trailer.rdd

Затем я выполнил объединение всех трех кадров как следующие

val resultRDD = spark.sparkContext.union(headerRDD,payloadRDD,trailerRDD).collect()

Я не могу преобразовать следующее в dataframe перед записью на диск.

1 Ответ

1 голос
/ 25 марта 2020

Union можно выполнять только для таблиц с одинаковым количеством столбцов .

Можно добавить пропущенные столбцы типа NullType до union.

def unionFrames(dfs: Seq[DataFrame]): DataFrame = {
    dfs match {
      case Nil => session.emptyDataFrame // or throw an exception?
      case x :: Nil => x
      case _ =>
        //Preserving Column order from left to right DF's column order
        val allColumns = dfs.foldLeft(collection.mutable.ArrayBuffer.empty[String])((a, b) => a ++ b.columns).distinct

        val appendMissingColumns = (df: DataFrame) => {
          val columns = df.columns.toSet
          df.select(allColumns.map(c => if (columns.contains(c)) col(c) else lit(null).as(c)): _*)
        }

        dfs.tail.foldLeft(appendMissingColumns(dfs.head))((a, b) => a.union(appendMissingColumns(b)))
    }

Примечание : вам не нужно преобразовывать DataFrame в RDD, вместо этого выполните union для DataFrame напрямую.

...