Как вы видите размеры разделов, созданных для функции Window в pyspark? - PullRequest
0 голосов
/ 27 августа 2018

Мой искровой (pyspark) ETL, использующий оконную функцию, перестал работать.Интересно, есть ли асимметрия в данных.Окно делает что-то вроде

windowSpec = Window.partitionBy('user').orderBy('time').rowsBetween(1, 1)
next_time = F.lead('time', 1).over(windowSpec)

Что если у данных есть какие-то посторонние пользователи с большим количеством данных?Когда пользователь запускает разделы, чтобы открыть окно, я представляю, что могу получить раздел, который слишком велик - я вижу, что только два из многих заданий не работают (задание может быть неправильной терминологией).

Как мне это проверить?Я знаю, что могу сделать df.groupBy('user').count() и отыскать посторонних пользователей, но как мне увидеть, насколько большими будут разделы, необходимые для функции Window?Я надеюсь, что спарк автоматически поместит несколько крупных пользователей в один раздел и множество мелких пользователей в другие.

1 Ответ

0 голосов
/ 27 августа 2018

Оконные функции с PARTITION BY используют тот же самый разделитель хеша, что и стандартные агрегаты.Давайте подтвердим данные примерами:

import string
import random
from pyspark.sql import functions as F

random.seed(1)
spark.conf.set("spark.sql.shuffle.partitions", 11)

df = spark.createDataFrame([
    random.sample(string.ascii_letters, 1) +  [random.random()] 
    for _ in range(10000)
], ("user", "time"))

С агрегацией:

(df.groupBy("user")
    .count()
    .groupBy(spark_partition_id().alias("partition"))
    .agg(F.sum("count"))
    .orderBy("partition")
    .show())

# +---------+----------+
# |partition|sum(count)|
# +---------+----------+
# |        0|       954|
# |        1|       605|
# |        2|      1150|
# |        3|      1339|
# |        4|       922|
# |        5|       751|
# |        6|       562|
# |        7|       579|
# |        8|      1440|
# |        9|       582|
# |       10|      1116|
# +---------+----------+

С оконными функциями:

(df
    .withColumn("next_time", next_time)
     # Ensure that window is not removed from the execution plan
    .rdd.toDF()  
    .groupBy(spark_partition_id().alias("partition"))
    .count()
    .orderBy("partition")
    .show())

# +---------+-----+
# |partition|count|
# +---------+-----+
# |        0|  954|
# |        1|  605|
# |        2| 1150|
# |        3| 1339|
# |        4|  922|
# |        5|  751|
# |        6|  562|
# |        7|  579|
# |        8| 1440|
# |        9|  582|
# |       10| 1116|
# +---------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...