Рассчитываются ли .withColumn и .agg параллельно в pyspark? - PullRequest
0 голосов
/ 22 марта 2019

Рассмотрим, например,

df.withColumn("customr_num", col("customr_num").cast("integer")).\
withColumn("customr_type", col("customr_type").cast("integer")).\
agg(myMax(sCollect_list("customr_num")).alias("myMaxCustomr_num"), \
    myMean(sCollect_list("customr_type")).alias("myMeanCustomr_type"), \
    myMean(sCollect_list("customr_num")).alias("myMeancustomr_num"),\
    sMin("customr_num").alias("min_customr_num")).show()

Are .withColumn и список функций внутри agg (sMin, myMax, myMean и т. Д.), Рассчитанных параллельнопо Spark или по порядку?

Если последовательно, как их распараллелить?

1 Ответ

4 голосов
/ 22 марта 2019

По сути, если у вас более одного раздела, операции всегда распараллеливаются в искре. Если вы имеете в виду, что операции withColumn будут рассчитываться за один проход по набору данных, тогда ответ также - да. В общем, вы можете использовать Spark UI, чтобы узнать больше о том, как все вычисляется.

Давайте рассмотрим пример, очень похожий на ваш.

spark.range(1000)
    .withColumn("test", 'id cast "double")
    .withColumn("test2", 'id + 10)
    .agg(sum('id), mean('test2), count('*))
    .show

А давайте посмотрим на интерфейс.

enter image description here

Range соответствует созданию данных, затем у вас есть project (две операции withColumn) и затем агрегация (agg) в каждом разделе (у нас здесь 2). В данном разделе эти действия выполняются последовательно, но для всех разделов одновременно. Кроме того, они находятся на той же стадии (в синем прямоугольнике), что означает, что все они вычисляются за один проход по данным.

Затем происходит случайное перемешивание (exchange), которое означает, что данные обмениваются по сети (результат агрегации на раздел), и выполняется окончательное агрегирование (HashAggregate), а затем отправляется драйверу (collect)

...