спарк группа и сериализация списка - PullRequest
0 голосов
/ 24 апреля 2018

Я хочу объединить набор данных (идентификатор, оценка, поле1, поле2, поле3) по идентификатору и включить другие столбцы, отсортированные по счету, в какой-то список / столбец, чтобы их можно было сериализовать в следующий объект.

collect_set занимает только один столбец, поэтому я не уверен, как все поля в столбец за исключением concat.Мне также нужно ограничить колонку списка тремя тестами.Полученный набор данных будет выглядеть примерно так: Integer id, Array(List).

id, [[score, field1, field2, field3], [score, field1, field2, field3], [score, field1, field2, field3]]

class Student {
    private int id;
    private List<Test> tests;
}

class Test {
    private int score;
    private String field1;
    private String field2;
    private String field3;
}

Например:

id1,99,"just","some","text"
id1,95,"just","more","text"
id1,75,"still","more","text"
id1,88,"yet","more","text"

приведет к:

id1,[[99,"just","some","text"], [95,"just","more","text"], [88,"yet","more","text"]]

Это отличается отранее задаваемые вопросы в том смысле, что он включает в себя сортировку и ограничение вывода, поэтому для ответа требовалась функция Windows, а для ответов на другие вопросы - нет.

1 Ответ

0 голосов
/ 25 апреля 2018

Вы можете использовать Window функции и struct:

val df = spark.createDataFrame(
    Seq((1, 99, "a"), (1, 95, "b"), (1, 75, "c"), (1, 88, "d"))
  ).toDF("id", "score", "field")

df.show
+---+-----+-----+
| id|score|field|
+---+-----+-----+
|  1|   99|    a|
|  1|   95|    b|
|  1|   75|    c|
|  1|   88|    d|
+---+-----+-----+

val w = Window.partitionBy("id").orderBy($"score".desc)

val res = df.withColumn("row", row_number().over(w))
  .filter($"row" <= 3)
  .groupBy("id")
  .agg(collect_list(struct("score", "field")).as("data"))

res.show(false)
+---+------------------------+                                                  
|id |data                    |
+---+------------------------+
|1  |[[99,a], [95,b], [88,d]]|
+---+------------------------+

res.printSchema
root
 |-- id: integer (nullable = false)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- score: integer (nullable = false)
 |    |    |-- field: string (nullable = true)

ПРИМЕЧАНИЕ : убедитесь, что collect_list поддерживает порядок на score. Если нет (и если вы заботитесь о заказе), вам нужно будет создать udf, который будет использовать ваш список data

Документы:

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...