Искажение данных с помощью countDistinct - PullRequest
1 голос
/ 09 июля 2020

У меня есть PySpark DataFrame с 3 столбцами: «клиент», «продукт», «дата». Я хочу запустить операцию groupBy:

df.groupBy("product", "date").agg(F.countDistinct("client"))

Итак, я хочу подсчитать количество клиентов, купивших продукт за каждый день. Это вызывает огромный перекос данных (фактически, это вызывает ошибку из-за памяти). Я изучаю техники соления. Как я понял, его можно использовать с 'sum' или 'count', добавляя новый столбец в groupBy и выполняя вторую агрегацию, но я не вижу, как их применить в этом случае из-за метода агрегации countDistinct.

Как мне применить его в этом случае?

1 Ответ

1 голос
/ 10 июля 2020

Я бы рекомендовал просто не использовать здесь countDistinct и добиться желаемого, используя 2 агрегирования подряд, тем более, что у вас есть перекос в ваших данных. Это может выглядеть следующим образом:

import pyspark.sql.functions as F
new_df = (df
  .groupBy("product", "date", "client")
  .agg({}) # getting unique ("product", "date", "client") tuples
  .groupBy("product", "date")
  .agg(F.count('*').alias('clients'))
)

Первая агрегация здесь гарантирует, что у вас есть DataFrame с одной строкой на каждый отдельный («продукт», «дата», «клиент») кортеж, второй - это подсчет числа клиентов для каждой пары («товар», «дата»). Таким образом, вам больше не нужно беспокоиться о перекосах, поскольку Spark будет знать, что нужно выполнять частичные агрегаты за вас (в отличие от countDistinct, который вынужден отправлять все индивидуальные значения «client», соответствующие каждому («product», «date ") пара к одному узлу).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...