Spark вызывает автоматическое разделение - PullRequest
0 голосов
/ 04 февраля 2019

Я использую кластер EMR со следующей конфигурацией: 1 ведущий, 4 ведомых.Общее количество исполнителей: 11, каждый исполнитель имеет 5 ядер и 34 ГБ памяти.Для динамического выделения установлено значение True.

Мой рабочий процесс pyspark состоит из следующих шагов:

  1. Чтение в 50 файлах CSV с общей суммой 100 000 строк из S3в кадре данных искры (DF).Запуск df.rdd.getNumPartitions () возвращает 50 разделов.

  2. Используя df.repartition (1000), я перераспределяю этот фрейм данных на 1000 разделов.

  3. Используя функцию UDF от pandas, я выполняю некоторые операции с этим фреймом данных и присваиваю результат другому фрейму данных (result_df).Этот информационный кадр имеет дополнительный строковый столбец по сравнению с df.

  4. Теперь я запускаю result_df.write.format ("parquet"). Save (s3_path_dest, mode = "overwrite"), чтобы сохранитьрезультат к месту S3.На этом этапе я вижу, что я получаю 2 этапа «сохранения» в веб-интерфейсе spark.Первый этап имеет 50 задач, указывающих, что имеется 50 разделов, в то время как второй этап имеет 1000 задач, соответствующих 1000 разделам.

  5. Затем я запускаю другую функцию UDF pandas на приведенном выше кадре данных (result_df), которыйвозвращает еще 2 столбца в дополнение к более старым 2 столбцам и сохраняет полученный df в папку S3, как это было сделано на шаге 4. Теперь я вижу, что снова есть 2 этапа: первый этап имеет только 5 задач, а второй - 1000 задач.

Вопрос 1: Почему произошло это «автоматическое» перераспределение?(5 разделов)

Вопрос 2. Почему этот процесс разбит на 2 этапа, где каждый этап работает с различным количеством разделов?Я ожидаю, что один этап будет работать с 1000 задачами / разделами.

Вопрос 3: На вкладке «Искровой интерфейс SQL» я вижу, что последний шаг начинается с шага «FileScan csv».Означает ли это, что когда я выполняю свой последний шаг, весь рабочий процесс запускается заново?Я не ожидаю этого, поскольку я уже запустил промежуточное действие сохранения (шаг 4), которое должно выполнить DAG до этой точки и не нужно пересчитывать результаты, которые могут понадобиться на следующих шагах.

Спасибозаранее за вашу помощь.

...