PySpark - оптимизировать количество разделов после чтения паркета - PullRequest
0 голосов
/ 05 июня 2018

В озере данных паркета, разделенном на year и month, с spark.default.parallelism, равным, например, 4, допустим, я хочу создать DataFrame, состоящий из месяцев 11 ~ 12 из 2017 и месяцев 1 ~3 из 2018 из двух источников A и B.

df = spark.read.parquet(
    "A.parquet/_YEAR={2017}/_MONTH={11,12}",
    "A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
    "B.parquet/_YEAR={2017}/_MONTH={11,12}",
    "B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)

Если я получу количество разделов, Spark по умолчанию использовал spark.default.parallelism:

df.rdd.getNumPartitions()
Out[4]: 4

Принимая во вниманиеучтите, что после создания df мне нужно выполнять операции join и groupBy за каждый период, и эти данные более или менее равномерно распределены по каждому из них (около 10 миллионов строк за период):

Вопрос

  • Улучшит ли перераспределение производительность моих последующих операций?
  • Если да, если у меня будет 10 разных периодов (по 5 в год как в А, так и вБ), следует ли мне перераспределять по количеству периодов и явно ссылаться на столбцы для перераспределения (df.repartition(10,'_MONTH','_YEAR'))?

1 Ответ

0 голосов
/ 05 июня 2018

Будет ли перераспределение улучшать производительность моих последующих операций?

Обычно это не так.Единственная причина преимущественного перераспределения данных состоит в том, чтобы избежать дальнейшей перестановки, когда один и тот же Dataset используется для нескольких объединений, на основе одного и того же условия

Если это так, если у меня есть 10 различных периодов (5 нагод как в A, так и в B), следует ли мне перераспределять по количеству периодов и явно ссылаться на столбцы для перераспределения (df.repartition (10, '_ MONTH', '_ YEAR'))?

Давайтеперейти к следующему шагу:

  • если я перераспределить по количеству периодов

    Практики не гарантируют соотношение между уровнями 1: 1и разделы, так что единственное, что нужно помнить, это то, что вы не можете иметь больше непустых разделов, чем уникальных ключей, поэтому использование значительно большего значения не имеет смысла.

  • и явно ссылаться на столбцы для перераспределения

    Если вы repartition, а затем join или groupBy, используя один и тот же набор столбцов для обеих частей, является единственным разумным решением.

Сводка

repartitoning до объединения имеет смысл в двух сценариях:

  • В случае нескольких последующих joins

    df_ = df.repartition(10, "foo", "bar")
    df_.join(df1, ["foo", "bar"])
    ...
    df_.join(df2, ["foo", "bar"])
    
  • С одним соединением, когда желаемое количество выходных разделов отличается от spark.sql.shuffle.partitions (и нет широковещательного соединения)

    spark.conf.get("spark.sql.shuffle.partitions")
    # 200
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df1_ = df1.repartition(11, "foo", "bar")
    df2_ = df2.repartition(11, "foo", "bar")
    
    df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions()
    # 11
    
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    # 200
    

    что может быть предпочтительнее, чем:

    spark.conf.set("spark.sql.shuffle.partitions", 11)
    df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
    spark.conf.set("spark.sql.shuffle.partitions", 200)
    
...