Оптимизировать код для использования альтернативы countDistinct / ок_счет_дистинкт - PullRequest
0 голосов
/ 12 октября 2018

Фрейм данных (df), с которым я сейчас работаю, содержит 10 миллионов строк.Это выглядит так:

col1 col2
a     p1        
a     p1        
a     p2        
a     p2       
a     p3        
a     p3        
b     p1        
b     p3        
b     p3        
b     p2         
b     p2        
b     p2    

Мне нужно подсчитать различное количество значений в col2, для каждого значения в col1.

Итак, мой ожидаемый результат -

col4   col5  col6
a      p1    2
a      p2    2
a      p3    2
b      p1    1
b      p2    3
b      p3    2

Я пытался использовать countDistinct и approx_count_distinct, но для запуска кода (> 10 часов) требуется несколько часов, что довольно печально!

Я попробовал следующие коды

Code1

df2 = df
    .groupBy($"col1", $"col2")
    .agg(approx_count_distinct($"col2"))

Code2 (занимает больше времени, чем code1)

df2 = df
    .groupBy($"col1", $"col2")
    .agg(countDistinct($"col2"))

Есть ли лучший способ подсчета различных значений, который уменьшил бы время выполнения моего кода?Спасибо.

Ответы [ 2 ]

0 голосов
/ 12 октября 2018

ваш код не компилируется (по крайней мере, в Spark 2.3.1).

Мне нужно подсчитать различное количество значений в col2 для каждого значения в col1.

Чтобы сделать это, вы должны сделать

val df2 = df
.groupBy($"col1")
.agg(countDistinct($"col2"))

Вы уверены, что эта часть кода занимает столько времени?Вы можете попытаться запустить df.rdd.count, чтобы увидеть, что ваше узкое место уже в df

0 голосов
/ 12 октября 2018

Что такое col3?

Все, что вам нужно, это считать, я бы подумал

df2 = df.groupBy($"col1",$"col2").count

И многие исполнители, возможно,

Вот пример из данных, которые выопубликовано

scala> val df = Seq(
     | ("a", "p1"),
     | ("a", "p1"),
     | ("a", "p2"),
     | ("a", "p2"),
     | ("a", "p3"),
     | ("a", "p3"),
     | ("b", "p1"),
     | ("b", "p3"),
     | ("b", "p3"),
     | ("b", "p2"),
     | ("b", "p2"),
     | ("b", "p2")).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: string, col2: string]

scala> val df2 = df.groupBy($"col1",$"col2").count
df2: org.apache.spark.sql.DataFrame = [col1: string, col2: string, count: bigint]

scala> df2.show
+----+----+-----+
|col1|col2|count|
+----+----+-----+
|   a|  p1|    2|
|   a|  p2|    2|
|   a|  p3|    2|
|   b|  p1|    1|
|   b|  p2|    3|
|   b|  p3|    2|
+----+----+-----+
...