Разбейте фрейм данных Spark по некоторым значениям столбца, а затем поверните каждый сгенерированный фрейм данных независимо от других. - PullRequest
0 голосов
/ 03 июля 2018

Я пытаюсь разделить фрейм данных в соответствии со значениями одного (или более) столбца и вращать каждый результирующий фрейм данных независимо от остальных. А именно, учитывая входной фрейм данных:

val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b","germany"),
                  ("lola","50","c","street c","argentina"), ("maria","60","d","street d","argentina"), ("joe","70","e","street e","argentina")
                  .toDF("name","age","company","address","country")

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//|  tom| 20|      a|street a|  germany|
//|jimmy| 30|      b|street b|  germany|
//| lola| 40|      c|street c|argentina|
//|maria| 50|      d|street d|argentina|
//|  joe| 60|      e|street e|argentina|
//+-----+---+-------+--------+---------+

Мне нужно разделить записи по разным значениям столбца "страна". Для входного кадра данных разделение должно дать:

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//|  tom| 20|      a|street a|  germany|
//|jimmy| 30|      b|street b|  germany|
//+-----+---+-------+--------+---------+

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//| lola| 40|      c|street c|argentina|
//|maria| 50|      d|street d|argentina|
//|  joe| 60|      e|street e|argentina|
//+-----+---+-------+--------+---------+

И мне также нужно повернуть столбцы «имя» и «возраст» под каждым фреймом данных таким образом, чтобы у каждого персона была своя компания и адрес, при этом оставаясь неизменными остальные столбцы. Желаемый выходной кадр данных будет выглядеть следующим образом:

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//|jimmy| 30|      a|street a|  germany|
//|  tom| 20|      b|street b|  germany|
//|  joe| 60|      c|street c|argentina|
//| lola| 40|      d|street d|argentina|
//|maria| 50|      e|street e|argentina|
//+-----+---+-------+--------+---------+

окончательный порядок строк не имеет значения

Моя 1-я попытка (работает на Spark-shell)

Я попытался назначить уникальный идентификатор каждой строке, затем перетасовать нужные столбцы (имя и возраст) и соединить переупорядоченный кадр данных с остальной частью кадра данных, используя вспомогательное значение идентификатора. Основная проблема здесь - это использование collect (), которое может быть опасно для больших фреймов данных, и repartition (1), который почти противоречит распределенным вычислениям и Spark (он использовался, чтобы избежать исключений при архивировании rdds с разным количеством разделов) .

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.LongType

// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)

// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
val rotateCols = colsToRotate.map(col) :+ col(auxCol)

// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())

val splitValuesSchema = dfWithID.select(splitCols: _*).schema

// create one dataframe for each value of the splitting column
val splitValuesDFs = dfWithID.select(splitCols: _*).distinct().collect()
  .map(row => spark.sparkContext.makeRDD(List(row)))
  .map(rdd => spark.createDataFrame(rdd, splitValuesSchema))

val rotateIDCols = Array(auxCol) ++ colsToRotate

// join the split values with their records (DFs with id + colsToRotate)
val splittedDFs = splitValuesDFs
  .map(df => df.join(dfWithID, colToSplit).selectExpr(rotateIDCols: _*))

// random reorder the auxiliar id column (DFs with random ids)
val randIdDFs = splittedDFs
  .map(df => df.select(auxCol).orderBy(rand()).toDF())

// get rdds with random ids
val randIdRdds = randIdDFs
  .map(df => df.select(auxCol).rdd.map(row => row(0)))

// finally, zip and append the rdds with the random ids to the dataframes created by
// splitting the main df to obtain the rotated dataframe with all the data
val tuples = (splittedDFs, randIdRdds).zipped
val newRdds = tuples
  .map((df: DataFrame, rdd) => df.rdd.repartition(1).zip(rdd.repartition(1))
  .map(row => Row.fromSeq(row._1.toSeq ++ Seq(row._2))))

val tuples2 = (splittedDFs, newRdds).zipped
val rotatedDF = tuples2.map((df: DataFrame, rdd) => spark
  .createDataFrame(rdd, df.schema.add("rotated_id", LongType)).drop(auxCol))
  .reduce(_ union _).withColumnRenamed("rotated_id", "column2join")

// get the rest of the columns
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
  .withColumnRenamed(auxCol, "column2join")

// join both dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
  .select(inputDF.columns.map(col): _*) // to keep the initial columns order

Отображение выходного кадра данных дает результат, аналогичный ожидаемому выходу, о котором говорилось выше (в основном это зависит от порядка функции rand ())

Я бы хотел по возможности избегать использования сборов и перераспределений и получить более функциональное решение.

Любые комментарии или идеи приветствуются!

1 Ответ

0 голосов
/ 04 июля 2018

Я продолжаю пытаться найти лучшее, более четкое и более функциональное решение, максимально удаляя неэффективные вызовы (перераспределение и некоторые сборы). Я добавил вспомогательный метод для индексации строк данных, чтобы иметь возможность объединять несвязанные части (столбцы или dfs, которые не могут быть объединены ни одним общим столбцом). Это моя текущая разработка, которая также удаляет множественные преобразования между rdds и dataframes и выглядит более читабельной и понятной.

Я надеюсь, что это может помочь кому-то с такими же проблемами .

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

// auxiliar method to index row in dataframes
def addRowIndex(df: DataFrame) = spark.createDataFrame(
  df.rdd.zipWithIndex.map { case (row, index) => Row.fromSeq(row.toSeq :+ index) },
  StructType(df.schema.fields :+ StructField("index", LongType, false))
)

// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)

// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")

// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())

val rotateIDCols = (Array(auxCol) ++ colsToRotate).map(col)

// get an array of dfs with the different values of the splitter column(s)
// --assuming there will not be too much different values in the splitter column--
val filterValues = dfWithID.select(splitCols: _*).distinct().collect()

// retrieve the different dfs according to the splitter values
val splitDfs = filterValues.map(filterRow => filterRow.getValuesMap(colToSplit)
  .foldLeft(dfWithID) {
    (df, filterField) =>
      df.filter(col(filterField._1) === filterField._2)
        .select(rotateIDCols: _*)
  })

// get and random reorder the aux id column for each dataframe
val randIdDfs = splitDfs.map(_.select(auxCol).orderBy(rand()).toDF())

// remove aux column for each dataframe
val splitWithoutIdDfs = splitDfs.map(_.drop(auxCol))

val dfsTuples = splitWithoutIdDfs.zip(randIdDfs)

// index row of dfs with columns to rotate and dfs with random ids
val indexedDfsTuples = dfsTuples.map {
  case (colsDf, idsDf) => (addRowIndex(colsDf), addRowIndex(idsDf))
}

// join reordered-ids dfs and cols to rotate dataframes by the index
val reorderedDfs = indexedDfsTuples.map {
  case (df1, df2) => df1.join(df2, Seq("index"))
    .drop("index").withColumnRenamed(auxCol, "column2join")
}

// union both dataframes to create the rotated df
reorderedDfs.tail.foldLeft(reorderedDfs.head) { (acc, df) => acc.union(df) }

// get the rest of the columns to get the part of the main df which does not change
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
  .withColumnRenamed(auxCol, "column2join")

// join the rotated and no rotated dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
  .select(inputDF.columns.map(col): _*) // to keep the initial columns order
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...