PySpark объединяет столбцы после пивота - PullRequest
1 голос
/ 18 марта 2020

Для следующего примера DataFrame:

df = spark.createDataFrame(
[
    ('2017-01-01', 'A', 1),
    ('2017-01-01', 'B', 2),
    ('2017-01-01', 'C', 3),
    ('2017-01-02', 'A', 4),
    ('2017-01-02', 'B', 5),
    ('2017-01-02', 'C', 6),
    ('2017-01-03', 'A', 7),
    ('2017-01-03', 'B', 8),
    ('2017-01-03', 'C', 9),
],
('date', 'type', 'value')
)

Я хотел бы преобразовать его, чтобы столбцы были равны всем уникальным "типам" (A, B и C).

В настоящее время я обнаружил, что этот код работает ближе всего к тому, чего я хотел бы достичь:

 df.groupby("date", "type").pivot("type").sum().orderBy("date").show()

+----------+----+----+----+----+
|      date|type|   A|   B|   C|
+----------+----+----+----+----+
|2017-01-01|   C|null|null|   3|
|2017-01-01|   A|   1|null|null|
|2017-01-01|   B|null|   2|null|
|2017-01-02|   B|null|   5|null|
|2017-01-02|   C|null|null|   6|
|2017-01-02|   A|   4|null|null|
|2017-01-03|   A|   7|null|null|
|2017-01-03|   C|null|null|   9|
|2017-01-03|   B|null|   8|null|
+----------+----+----+----+----+

Проблема в том, что у меня все еще слишком много строк (содержащих все "нуль").

То, что я хотел бы получить:

+----------+---+---+---+
|      date|  A|  B|  C|
+----------+---+---+---+
|2017-01-01|  1|  2|  3|
|2017-01-02|  4|  5|  6|
|2017-01-03|  7|  8|  9|
+----------+---+---+---+

Ака, я хотел бы что-то, что имеет функциональность, аналогичную pandas .DataFrame.unstack ().

Если у кого-либо есть какие-либо Советы о том, как я могу добиться этого в PySpark, было бы здорово.

1 Ответ

1 голос
/ 18 марта 2020

Вам нужно создать другую группу по столбцу "date", затем выбрать max значения из A,B,C.

Example:

df.groupby("date", "type").pivot("type").sum().orderBy("date").groupBy("date").agg(max(col("A")).alias("A"),max(col("B")).

#+----------+---+---+---+
#|      date|  A|  B|  c|
#+----------+---+---+---+
#|2017-01-01|  1|  2|  3|
#|2017-01-02|  4|  5|  6|
#|2017-01-03|  7|  8|  9|
#+----------+---+---+---+

# dynamic way 
aggregate = ["A","B","C"]
funs=[max]
exprs=[f(col(c)).alias(c) for f in funs for c in aggregate]
df.groupby("date", "type").pivot("type").sum().orderBy("date").groupBy("date").agg(*exprs).show()

#+----------+---+---+---+
#|      date|  A|  B|  c|
#+----------+---+---+---+
#|2017-01-01|  1|  2|  3|
#|2017-01-02|  4|  5|  6|
#|2017-01-03|  7|  8|  9|
#+----------+---+---+---+
...