Эффективный способ фильтрации данных на Spark? - PullRequest
0 голосов
/ 23 января 2019

Программа Pyspark .....

df [df ["timeDiff"] <= 30]
        or
df.filter(df["timeDiff"] <= 30)

Оба кода дают одинаковый результат. Но может кто-нибудь объяснить, что будет более эффективным в распределенной среде Spark. или сослаться на некоторые документы. Я попытался выполнить поиск по stackoverflow, но безуспешно ....

Ответы [ 2 ]

0 голосов
/ 23 января 2019

Добавление до @ user10954945, вот планы выполнения для обоих:

import pyspark
sc = pyspark.SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession(sc)

df = spark.createDataFrame(((1,), (2,)), ['timeDiff'])
filtered_1 = df[df["timeDiff"] <= 30]
filtered_2 = df.filter(df["timeDiff"] <= 30)

filtered_1.explain()

== Physical Plan ==    
*(1) Filter (isnotnull(timeDiff#6L) && (timeDiff#6L <= 30))
+- Scan ExistingRDD[timeDiff#6L]

filtered_2.explain()

== Physical Plan ==
*(1) Filter (isnotnull(timeDiff#6L) && (timeDiff#6L <= 30))
+- Scan ExistingRDD[timeDiff#6L]

Фактически, вы получаете тот же результат, используя SQL API:

df.createOrReplaceTempView('df')
filtered_3 = spark.sql("SELECT * FROM df WHERE timeDiff <= 30")
filtered_3.explain()

== Physical Plan ==
*(1) Filter (isnotnull(timeDiff#6L) && (timeDiff#6L <= 30))
+- Scan ExistingRDD[timeDiff#6L]
0 голосов
/ 23 января 2019

Оба варианта полностью эквивалентны, когда речь идет о сгенерированном плане выполнения, поэтому вы можете использовать любой, какой пожелаете, - разницы в производительности не будет.

Однако последний является идиоматическим подходом, большинство примеров, учебных пособий и проектов будут использовать этот. Также он в значительной степени идентичен Scala API. Поэтому, как правило, предпочтительно сокращать усилия по разработке.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...