Я бы рекомендовал просто не использовать здесь countDistinct
и добиться желаемого, используя 2 агрегирования подряд, тем более, что у вас есть перекос в ваших данных. Это может выглядеть следующим образом:
import pyspark.sql.functions as F
new_df = (df
.groupBy("product", "date", "client")
.agg({}) # getting unique ("product", "date", "client") tuples
.groupBy("product", "date")
.agg(F.count('*').alias('clients'))
)
Первая агрегация здесь гарантирует, что у вас есть DataFrame с одной строкой на каждый отдельный («продукт», «дата», «клиент») кортеж, второй - это подсчет числа клиентов для каждой пары («товар», «дата»). Таким образом, вам больше не нужно беспокоиться о перекосах, поскольку Spark будет знать, что нужно выполнять частичные агрегаты за вас (в отличие от countDistinct
, который вынужден отправлять все индивидуальные значения «client», соответствующие каждому («product», «date ") пара к одному узлу).