Я пытаюсь разделить фрейм данных в соответствии со значениями одного (или более) столбца и вращать каждый результирующий фрейм данных независимо от остальных. А именно, учитывая входной фрейм данных:
val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b","germany"),
("lola","50","c","street c","argentina"), ("maria","60","d","street d","argentina"), ("joe","70","e","street e","argentina")
.toDF("name","age","company","address","country")
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
Мне нужно разделить записи по разным значениям столбца "страна". Для входного кадра данных разделение должно дать:
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//+-----+---+-------+--------+---------+
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
И мне также нужно повернуть столбцы «имя» и «возраст» под каждым фреймом данных таким образом, чтобы у каждого персона была своя компания и адрес, при этом оставаясь неизменными остальные столбцы. Желаемый выходной кадр данных будет выглядеть следующим образом:
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//|jimmy| 30| a|street a| germany|
//| tom| 20| b|street b| germany|
//| joe| 60| c|street c|argentina|
//| lola| 40| d|street d|argentina|
//|maria| 50| e|street e|argentina|
//+-----+---+-------+--------+---------+
окончательный порядок строк не имеет значения
Моя 1-я попытка (работает на Spark-shell)
Я попытался назначить уникальный идентификатор каждой строке, затем перетасовать нужные столбцы (имя и возраст) и соединить переупорядоченный кадр данных с остальной частью кадра данных, используя вспомогательное значение идентификатора. Основная проблема здесь - это использование collect (), которое может быть опасно для больших фреймов данных, и repartition (1), который почти противоречит распределенным вычислениям и Spark (он использовался, чтобы избежать исключений при архивировании rdds с разным количеством разделов) .
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.LongType
// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)
// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
val rotateCols = colsToRotate.map(col) :+ col(auxCol)
// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())
val splitValuesSchema = dfWithID.select(splitCols: _*).schema
// create one dataframe for each value of the splitting column
val splitValuesDFs = dfWithID.select(splitCols: _*).distinct().collect()
.map(row => spark.sparkContext.makeRDD(List(row)))
.map(rdd => spark.createDataFrame(rdd, splitValuesSchema))
val rotateIDCols = Array(auxCol) ++ colsToRotate
// join the split values with their records (DFs with id + colsToRotate)
val splittedDFs = splitValuesDFs
.map(df => df.join(dfWithID, colToSplit).selectExpr(rotateIDCols: _*))
// random reorder the auxiliar id column (DFs with random ids)
val randIdDFs = splittedDFs
.map(df => df.select(auxCol).orderBy(rand()).toDF())
// get rdds with random ids
val randIdRdds = randIdDFs
.map(df => df.select(auxCol).rdd.map(row => row(0)))
// finally, zip and append the rdds with the random ids to the dataframes created by
// splitting the main df to obtain the rotated dataframe with all the data
val tuples = (splittedDFs, randIdRdds).zipped
val newRdds = tuples
.map((df: DataFrame, rdd) => df.rdd.repartition(1).zip(rdd.repartition(1))
.map(row => Row.fromSeq(row._1.toSeq ++ Seq(row._2))))
val tuples2 = (splittedDFs, newRdds).zipped
val rotatedDF = tuples2.map((df: DataFrame, rdd) => spark
.createDataFrame(rdd, df.schema.add("rotated_id", LongType)).drop(auxCol))
.reduce(_ union _).withColumnRenamed("rotated_id", "column2join")
// get the rest of the columns
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
.withColumnRenamed(auxCol, "column2join")
// join both dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
.select(inputDF.columns.map(col): _*) // to keep the initial columns order
Отображение выходного кадра данных дает результат, аналогичный ожидаемому выходу, о котором говорилось выше (в основном это зависит от порядка функции rand ())
Я бы хотел по возможности избегать использования сборов и перераспределений и получить более функциональное решение.
Любые комментарии или идеи приветствуются!