Как сводить таблицу без функции агрегирования в pyspark - PullRequest
0 голосов
/ 21 марта 2020

У меня есть такой фрейм данных в pyspark.

|--------------|----------------|---------------|
|   col_1      |     col_2      |   col_3       |
|-----------------------------------------------|
|       1      |       A        |     abd       |
|-----------------------------------------------|
|       1      |       B        |     acd       |
|-----------------------------------------------|
|       1      |       A        |     bcd       |
|-----------------------------------------------|
|       1      |       B        |     ceg       |
------------------------------------------------|
|       2      |       A        |     cgs       |
|-----------------------------------------------|
|       2      |       B        |     bsc       |
|-----------------------------------------------|
|       2      |       A        |     iow       |
|-----------------------------------------------|

Я хотел бы включить в него таблицу.

|--------------|----------------|---------------|
|   col_1      |       A        |      B        |
|-----------------------------------------------|
|       1      |       abd      |     acd       |
|-----------------------------------------------|
|       1      |       bcd      |     ceg       |
|-----------------------------------------------|
|       2      |       cgs      |     bsc       |
|-----------------------------------------------|
|       2      |       iow      |     null      |
------------------------------------------------|

Как мне это сделать? Функция pivot для фрейма данных pyspark требует агрегатной функции, и в моем случае col_1 также не уникальна.

1 Ответ

0 голосов
/ 23 марта 2020

Вот способ, которым вы можете получить целевой результат:

    import pyspark.sql.functions as F

    df = df.groupBy('col_1').pivot("col_2").agg(F.collect_list("col_3"))

    cols = df.columns[1:]
    res = df.select("col_1",F.explode(cols[0]).alias(cols[0])).withColumn("id", F.monotonically_increasing_id())

    for name in cols[1:]:
        res = res.join(df.select("col_1",F.explode(name).alias(name)).withColumn("id", F.monotonically_increasing_id()),on = ["id","col_1"], how = "outer")

    res = res.orderBy("col_1").drop("id")
...