Добавить агрегированные столбцы в сводку без объединения - PullRequest
0 голосов
/ 28 февраля 2019

Учитывая таблицу:

df=sc.parallelize([(1,1,1),(5,0,2),(27,1,1),(1,0,3),(5,1,1),(1,0,2)]).toDF(['id', 'error', 'timestamp'])
df.show()

+---+-----+---------+
| id|error|timestamp|
+---+-----+---------+
|  1|    1|        1|
|  5|    0|        2|
| 27|    1|        1|
|  1|    0|        3|
|  5|    1|        1|
|  1|    0|        2|
+---+-----+---------+

Я хотел бы сделать сводку для столбца timestamp, сохраняя некоторую другую агрегированную информацию из исходной таблицы.Интересующий меня результат может быть достигнут с помощью

df1=df.groupBy('id').agg(sf.sum('error').alias('Ne'),sf.count('*').alias('cnt'))
df2=df.groupBy('id').pivot('timestamp').agg(sf.count('*')).fillna(0)
df1.join(df2, on='id').filter(sf.col('cnt')>1).show()

с полученной таблицей:

+---+---+---+---+---+---+
| id| Ne|cnt|  1|  2|  3|
+---+---+---+---+---+---+
|  5|  1|  2|  1|  1|  0|
|  1|  1|  3|  1|  1|  1|
+---+---+---+---+---+---+

Однако с упомянутым решением есть как минимум две проблемы:

  1. Я фильтрую по cnt в конце скрипта.Если бы я смог сделать это в начале, я бы мог избежать почти всей обработки, потому что большая часть данных удаляется с помощью этой фильтрации.Есть ли способ сделать это, кроме методов collect и isin?
  2. Я делаю groupBy на id два раза.Во-первых, для агрегирования некоторых столбцов мне нужно в результатах, а во второй раз, чтобы получить сводные столбцы.Наконец, мне нужно join, чтобы объединить эти столбцы.Я чувствую, что, безусловно, упускаю какое-то решение, потому что это может быть возможно сделать только с одним groubBy и без join, но я не могу понять, как это сделать.

1 Ответ

0 голосов
/ 01 марта 2019

Я думаю, что вы не можете обойти объединение, потому что стержню понадобятся значения меток времени, и первая группа не должна их учитывать.Поэтому, если вам нужно создать значения NE и cnt, вы должны сгруппировать фрейм данных только по id, что приведет к потере метки времени, если вы хотите сохранить значения в столбцах, вы должны сделать сводку, как высделал отдельно и присоедините его обратно.

Единственное улучшение, которое можно сделать, это перенести фильтр в создание df1.Как вы сказали, это уже может улучшить производительность, поскольку df1 должен быть намного меньше после фильтрации ваших реальных данных.

from pyspark.sql.functions import *

df=sc.parallelize([(1,1,1),(5,0,2),(27,1,1),(1,0,3),(5,1,1),(1,0,2)]).toDF(['id', 'error', 'timestamp'])
df1=df.groupBy('id').agg(sum('error').alias('Ne'),count('*').alias('cnt')).filter(col('cnt')>1)
df2=df.groupBy('id').pivot('timestamp').agg(count('*')).fillna(0)
df1.join(df2, on='id').show()

Выход:

+---+---+---+---+---+---+
| id| Ne|cnt|  1|  2|  3|
+---+---+---+---+---+---+
|  5|  1|  2|  1|  1|  0|
|  1|  1|  3|  1|  1|  1|
+---+---+---+---+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...