использование functools для уменьшения на распределенном Spark DataFrame - PullRequest
0 голосов
/ 24 апреля 2020

Я пытаюсь добавить список столбцов в существующий Spark DataFrame.

Пример кода:

columns_list = ['col1', 'col2', 'col3', 'col4']
reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df).show()

Это дает ожидаемые результаты.

Использует ли reduce() в распределенном Spark DataFrame, попытается выполнить все на Single Node?

1 Ответ

1 голос
/ 24 апреля 2020

ОП спросил

Пытается ли использование метода limit () на распределенном Spark DataFrame выполнить все на одном узле?

Но я думаю, что ОП действительно хочет знать равно

отличаются ли следующие команды от точки исполнения Spark?

Создать игрушечные данные

data = [
    ('1',),
    ('2',),
    ('3',),
    ('4',),
]
df = spark.createDataFrame(data, ['id'])

Вы можете увидеть план выполнения Ваш код, используя .explain()

Сценарий 1 (с использованием functools.reduce)

from functools import reduce
from pyspark.sql.functions import col, lit
columns_list = ['col1', 'col2', 'col3', 'col4']
reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df).show()
result1 = reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df)
result1.explain()

== Physical Plan ==
*(1) Project [id#0, NULL AS col1#122, NULL AS col2#125, NULL AS col3#129, NULL AS col4#134]
+- Scan ExistingRDD[id#0]

Сценарий 2 (код @ anky с использованием select и понимание списка)

result2 = df.select("*",*[lit('NULL').alias(i) for i in columns_list])
result2.explain()

== Physical Plan ==
*(1) Project [id#0, NULL AS col1#140, NULL AS col2#141, NULL AS col3#142, NULL AS col4#143]
+- Scan ExistingRDD[id#0]

Сценарий 3 (используется для l oop и итеративного назначения)

result3 = df
for i in columns_list:
    result3 = result3.withColumn(i, lit('NULL'))

result3.explain()

== Physical Plan ==
*(1) Project [id#0, NULL AS col1#167, NULL AS col2#170, NULL AS col3#174, NULL AS col4#179]
+- Scan ExistingRDD[id#0]

Обратите внимание, что Сценарий 3 не работает в «base» Python (почему functools.reduce() нужно). OP, я предлагаю прочитать о различиях между трансформациями и действиями в Spark . Сначала Spark генерирует «План» выполнения, поэтому Reduce() не требуется.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...