Apache Spark: как 200 задач-редукторов могут агрегировать 20000+ картографических результатов? - PullRequest
1 голос
/ 07 ноября 2019

Обновленный вопрос

Что мне непонятно => в ShuffleMapStage каждый Mapper создаст .data и .index file

Эти data / index будут иметь имя подобно

shuflle_X_Y_Z

где

X = shuffle_id

Y = map_id

Z = REDUCER_ID

I Понимание map_id может варьироваться от 1-222394

НО КАК О REDUCER_ID ?

это так1-200 (например, раздел по умолчанию для ResultStage)?

это = число исполнителей?

, если это 1-200, то делаетКак эти 200 задач знают, какие данные / индексный файл читать?

Помогите мне понять, что

Я не понимаю, как Уменьшить / Агрегировать задачи работают? Скажем, у меня есть простой пример, например

input_df = spark.read.parquet("Big_folder_having parquets")

# Spark loads and during reading partitions = as per number of files * number of 128MB blocks.

# Now I do a Simple Aggergation/Count 

input_df.createOrReplaceTempView("table1")

grouped_df = spark.sql("select key1, key2, count(1) as user_count from table1 group by 1,2")

# And simply write it with default 200 parallelism

grouped_df.write.format("parquet").mode("overwrite").save(my_save_path)

Так что для входная нагрузка родительский rdd / входная карта Этап имеет 22394 разделов

Как я понимаю, каждый картограф создаст файл данных и индекса shuflle

Теперь Следующий этап имеет только 200 задач ( разделы по умолчанию в случайном порядке )

Как эти 200 редукторов / задач могут обрабатывать выходные данные из 22394 задач mapper ?

В приложении Снимок экрана DAG enter image description here

Ответы [ 2 ]

2 голосов
/ 07 ноября 2019

У вас есть кластер с 40 ядрами.

Что происходит:

Вы просите Spark прочитать файлы в каталоге, он будет выполнять 40 задач одновременно (с тех порэто количество ядер, которое вы получили), и в результате получится RDD с 22 394 разделами. (Будьте осторожны с разливом в случайном порядке. Проверьте детали этапа.)

Затем вы просите Spark сгруппировать ваши данные по некоторым ключам, а затем записать их.

Поскольку разделами в случайном порядке по умолчанию является 200Spark будет «перемещать» данные из 22 394 разделов в 200 разделов и обрабатывать одновременно 40 задач / разделов.

Другими словами ...

Когда вы запрашиваете группирование и сохранение, Sparkсоздайте планы (я рекомендую вам исследовать физические и логические планы), и он скажет: «Для того, чтобы делать то, о чем меня просит пользователь, я создам 200 задач, которые будут выполнены с данными»

Тогда исполнители будут выполнять 40 задач за один раз.

По сути, нет картографов или редукторов.

Есть задачи, которые Spark создаст, и есть исполнители, которые выполнят эти задачи.

Редактировать:

Забыл упомянуть, что количество разделов в СДР будет определять количество выходных файлов.

0 голосов
/ 07 ноября 2019

Если у вас есть 10 ведер с 10 яблоками или 1 ведро с 100 яблоками, это все равно общее количество яблок.

Спрашивать, как он может справиться, это все равно, что спрашивать, как вы можете нести 10 ведер или нести 1 ведро.

Он будет либо делать это, либо не будет зависеть от объема данных, которые у вас есть. Проблема, с которой вы можете столкнуться, заключается в том, что данные выливаются на диск, поскольку при наличии 200 разделов каждый раздел должен обрабатывать больше данных, которые не обязательно помещаются в память.

...