Порядок исполнения и необходимость кеширования - PullRequest
0 голосов
/ 07 мая 2018

Давайте рассмотрим такой фрагмент псевдокода Python, используя spark.

    rdd1 = sc.textFile("...")
    rdd2 = rdd1.map().groupBy().filter()
    importantValue = rdd2.count()
    rdd3 = rdd1.map(lambda x : x / importantValue)

В DAG задач spark есть две ветви после создания rdd1. В обеих ветвях используется rdd1, но во второй (вычисление rdd3) также используется агрегированное значение из rdd2 (importantValue). Я предполагаю, что DAG выглядит примерно так: enter image description here Я прав? Если да, можем ли мы предположить, что rdd1 используемый в вычислениях rdd3 все еще обрабатывается в памяти? Или мы должны кэшировать rdd1, чтобы предотвратить повторную загрузку этого?

В целом, если DAG выглядит так: enter image description here Можем ли мы предположить, что обе ветви вычисляются параллельно и используют одну и ту же копию rdd1? Или драйвер Spark будет вычислять эти ветви одну за другой, потому что это два разных этапа? Я знаю, что перед запуском искровой драйвер разбивает DAG на этапы и более детальные логические части - tasks Задачи в пределах одной стадии могут быть вычислены в pararell, потому что внутри нет шаффовой стадии, но как насчет двух ветвей pararell, как на изображении? Я знаю всю интуицию, ведущую к странной абстракции (ленивая оценка и т. Д.), Но это не облегчает мне понимание. Пожалуйста, дайте мне какие-либо советы.

Ответы [ 2 ]

0 голосов
/ 07 мая 2018

Можно ли предположить, что rdd1, используемый в вычислениях rdd3, все еще обрабатывается в памяти? Или мы должны кэшировать rdd1, чтобы предотвратить повторную загрузку этого?

rdd1 не кэшируется. Для его кеширования необходимо явно его кешировать и убедиться, что СДР, возвращаемый .cache(), хранится в переменной, используемой для создания rdd2:

rdd1 = sc.textFile("...").cache()
rdd2 = rdd1.map().groupBy().filter()

Если бы это было добавлено к вашей диаграмме, был бы другой RDD между rdd1 и rdd2.

Можем ли мы предположить, что обе ветви вычисляются параллельно и использовать одну и ту же копию rdd1? Или драйвер Spark будет вычислять эти ветви одну за другой, потому что это два разных этапа?

То, что определяет, выполняются ли параллельно выполняемые задания, не является (ветвями) происхождением, а самими запросами заданий Предположим, вы запустили:

rdd4.count()
rdd5.count()

Это будет выполнено rdd1 -> rdd2 -> rdd4, затем считать, , затем rdd1 -> rdd3 -> rdd5. Это вычисление будет последовательным.

Чтобы запустить эти два графика параллельно, задания должны быть представлены параллельно (асинхронные вызовы). Есть много вопросов по этой теме. Отметьте это и это .

0 голосов
/ 07 мая 2018

Я предполагаю, что DAG выглядит примерно так: Я прав?

Да.

Если да, можем ли мы предположить, что rdd1, используемый в вычислениях rdd3, все еще обрабатывается в памяти?

Нет. Spark использует ленивую оценку для обработки данных. Это означает, что ничего не вычисляется, пока это не нужно. И ничего не будет сохранено, если для этого нет явного заявления.

Или мы должны кэшировать rdd1, чтобы предотвратить повторную загрузку этого?

Точно, вам нужно будет кешировать rdd1, чтобы текстовый файл не читался дважды.

В более общем плане, если DAG выглядит следующим образом: можем ли мы предположить, что обе ветви вычисляются параллельно и использовать одну и ту же копию rdd1? Или драйвер Spark будет вычислять эти ветви одну за другой, потому что это два разных этапа?

Две ветви не будут обрабатываться параллельно, так как они имеют отдельные линии. Как правило, данные не обрабатываются до тех пор, пока не будет выполнено действие. Всякий раз, когда нужен результат (читай, требуется действие), происходит обработка данных для всех последующих преобразований и текущего действия в данной линии. После этого никакие данные не будут существовать в памяти, если не был вызван cache.

См. колода для объяснения трансформации против действий

...