Spark 2.3 с Java8 преобразует строку в столбцы - PullRequest
1 голос
/ 26 сентября 2019

Я новичок в Spark 2.4 с Java 8. Мне нужна помощь.Вот пример экземпляров:

Источник DataFrame

+--------------+
| key | Value  |
+--------------+
| A   | John   |
| B   | Nick   |
| A   | Mary   |
| B   | Kathy  |
| C   | Sabrina|
| B   | George |
+--------------+

Meta DataFrame

+-----+
| key |
+-----+
| A   |
| B   |
| C   |
| D   |
| E   |
| F   |
+-----+

Я хотел бы преобразовать его в следующее: Имена столбцов из Meta Dataframe иСтроки будут преобразованы на основе Source Dataframe

+-----------------------------------------------+
| A    | B      | C       | D     | E    | F    |
+-----------------------------------------------+
| John | Nick   | Sabrina | null  | null | null |
| Mary | Kathy  | null    | null  | null | null |
| null | George | null    | null  | null | null |
+-----------------------------------------------+

Необходимо написать код Spark 2.3 с Java8.Ценю вашу помощь.

1 Ответ

3 голосов
/ 26 сентября 2019

Чтобы сделать вещи более понятными (и легко воспроизводимыми), давайте определим кадры данных:

val df1 = Seq("A" -> "John", "B" -> "Nick", "A" -> "Mary", 
              "B" -> "Kathy", "C" -> "Sabrina", "B" -> "George")
          .toDF("key", "value")
val df2 = Seq("A", "B", "C", "D", "E", "F").toDF("key")

Из того, что я вижу, вы пытаетесь создать один столбец по значению в столбце key в df2.Эти столбцы должны содержать все значения столбца value, связанные с key, именующим столбец.Если мы возьмем пример, первое значение столбца A должно быть значением первого вхождения A (если оно существует, в противном случае - ноль): "John".Его второе значение должно быть значением второго вхождения A: "Mary".Третьего значения не существует, поэтому третье значение столбца должно быть null.

Я подробно описал это, чтобы показать, что нам нужно понятие ранга значений для каждого ключа (оконная функция), и сгруппировать поэто понятие ранга.Это будет выглядеть следующим образом:

import org.apache.spark.sql.expressions.Window
val df1_win = df1
    .withColumn("id", monotonically_increasing_id)
    .withColumn("rank", rank() over Window.partitionBy("key").orderBy("id"))
// the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
val keys = df2.collect.map(_.getAs[String](0)).sorted

// then it's just about pivoting
df1_win
    .groupBy("rank")
    .pivot("key", keys) 
    .agg(first('value))
    .orderBy("rank")
    //.drop("rank") // I keep here it for clarity
    .show()
+----+----+------+-------+----+----+----+                                       
|rank|   A|     B|      C|   D|   E|   F|
+----+----+------+-------+----+----+----+
|   1|John|  Nick|Sabrina|null|null|null|
|   2|Mary| Kathy|   null|null|null|null|
|   3|null|George|   null|null|null|null|
+----+----+------+-------+----+----+----+

Вот тот же код на Java

Dataset<Row> df1_win = df1
    .withColumn("id", functions.monotonically_increasing_id())
    .withColumn("rank", functions.rank().over(Window.partitionBy("key").orderBy("id")));
    // the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
// Note that it is a list of objects, to match the (strange) signature of pivot
List<Object> keys = df2.collectAsList().stream()
    .map(x -> x.getString(0))
    .sorted().collect(Collectors.toList());

// then it's just about pivoting
df1_win
    .groupBy("rank")
    .pivot("key", keys)
    .agg(functions.first(functions.col("value")))
    .orderBy("rank")
    // .drop("rank") // I keep here it for clarity
    .show();
...