Почему мой случайный раздел не равен 200 (по умолчанию) во время групповой операции? (Spark 2.4.5) - PullRequest
3 голосов
/ 06 августа 2020

Я новичок в Spark и пытаюсь понять его внутренности. Итак, я читаю небольшой паркетный файл размером 50 МБ из s3 и выполняю группировку, а затем сохраняю обратно в s3. Когда я наблюдаю за пользовательским интерфейсом Spark, я вижу 3 этапа, созданных для этого:

этап 0: загрузка (1 задача)

этап 1: случайный запрос, этап для группировки (12 задач)

этап 2: сохранить (coalescedshufflereader) (26 задач)

Пример кода:

df = spark.read.format("parquet").load(src_loc)
df_agg = df.groupby(grp_attribute)\                             
 .agg(F.sum("no_of_launches").alias("no_of_launchesGroup")
df_agg.write.mode("overwrite").parquet(target_loc)

Я использую экземпляр EMR с 1 главным узлом, 3 основными узлами (каждый с 4 ядрами). Итак, параллелизм по умолчанию равен 12. Я не меняю никаких конфигураций во время выполнения. Но я не могу понять, почему на финальном этапе создается 26 задач? Насколько я понимаю, по умолчанию раздел в случайном порядке должен быть 200. Скриншот пользовательского интерфейса прилагается.

введите описание изображения здесь

Ответы [ 2 ]

3 голосов
/ 06 августа 2020

Я пробовал аналогичный logi c на Databricks с Spark 2.4.5.

Я заметил, что с spark.conf.set('spark.sql.adaptive.enabled', 'true') окончательное число моих разделов равно 2.

Я наблюдаю что с spark.conf.set('spark.sql.adaptive.enabled', 'false') и spark.conf.set('spark.sql.shuffle.partitions', 75) окончательное количество моих разделов равно 75.

Использование print(df_agg.rdd.getNumPartitions()) показывает это.

Итак, вывод задания в Spark UI не отражает этого . Может быть, в конце произойдет передел. Интересно, но не проблема.

2 голосов
/ 06 августа 2020

В Spark sql количество разделов в случайном порядке устанавливается с помощью spark. sql .shuffle.partitions, которое по умолчанию равно 200. В большинстве случаев это число слишком велико для небольших данных и слишком мало для больших данных. . Выбор правильного значения всегда становится сложным для разработчика.

Таким образом, нам нужна возможность объединять разделы в случайном порядке, глядя на выходные данные средства отображения. Если отображение генерирует небольшое количество разделов, мы хотим уменьшить общее количество разделов в случайном порядке, чтобы повысить производительность.

В последней версии Spark3.0 с Adaptive Query Execution , это Функция сокращения задач автоматизирована. http://blog.madhukaraphatak.com/spark-aqe-part-2/

Принимая во внимание это в Spark2.4.5, также оптимизатор каталистов или EMR, возможно, включили эту функцию, чтобы уменьшить количество задач внутри, а не на 200 задач.

...