Pyspark: рассчитывать на pyspark. sql .dataframe. DataFrame занимает много времени - PullRequest
2 голосов
/ 31 января 2020

У меня есть pyspark.sql.dataframe.DataFrame, как показано ниже

df.show()
+--------------------+----+----+---------+----------+---------+----------+---------+
|                  ID|Code|bool|      lat|       lon|       v1|        v2|       v3|
+--------------------+----+----+---------+----------+---------+----------+---------+
|5ac52674ffff34c98...|IDFA|   1|42.377167| -71.06994|17.422535|1525319638|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37747|-71.069824|17.683573|1525319639|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37757| -71.06942|22.287935|1525319640|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37761| -71.06943|19.110023|1525319641|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.377243| -71.06952|18.904774|1525319642|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378254| -71.06948|20.772903|1525319643|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37801| -71.06983|18.084948|1525319644|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378693| -71.07033| 15.64326|1525319645|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378723|-71.070335|21.093477|1525319646|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37868| -71.07034|21.851894|1525319647|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.378716| -71.07029|20.583202|1525319648|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37872| -71.07067|19.738768|1525319649|36.853622|
|5ac52674ffff34c98...|IDFA|   1|42.379112| -71.07097|20.480911|1525319650|36.853622|
|5ac52674ffff34c98...|IDFA|   1| 42.37952|  -71.0708|20.526752|1525319651| 44.93808|
|5ac52674ffff34c98...|IDFA|   1| 42.37902| -71.07056|20.534052|1525319652| 44.93808|
|5ac52674ffff34c98...|IDFA|   1|42.380203|  -71.0709|19.921381|1525319653| 44.93808|
|5ac52674ffff34c98...|IDFA|   1| 42.37968|-71.071144| 20.12599|1525319654| 44.93808|
|5ac52674ffff34c98...|IDFA|   1|42.379696| -71.07114|18.760069|1525319655| 36.77853|
|5ac52674ffff34c98...|IDFA|   1| 42.38011| -71.07123|19.155525|1525319656| 36.77853|
|5ac52674ffff34c98...|IDFA|   1| 42.38022|  -71.0712|16.978994|1525319657| 36.77853|
+--------------------+----+----+---------+----------+---------+----------+---------+
only showing top 20 rows

Если попытаться count

%%time
df.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 28.1 s

30241272

сейчас, если я возьму подмножество df время для подсчета еще длиннее.

id0 = df.first().ID  ## First ID
tmp = df.filter( (df['ID'] == id0) )

%%time
tmp.count()

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 1min 33s
Out[6]:
3299

Ответы [ 2 ]

0 голосов
/ 01 февраля 2020

Ваш вопрос очень сложный и хитрый ..

Я протестировал большой набор данных, чтобы воспроизвести ваше поведение.

Описание проблемы

Я протестировал следующие два случаи в большом наборе данных:

# Case 1
df.count() # Execution time: 37secs

# Case 2
df.filter((df['ID'] == id0)).count() #Execution time: 1.39 min

Объяснение

Позволяет увидеть физический план только с .count():

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#38L])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#41L])
      +- *(1) FileScan csv [] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Позволяет увидеть физический план с .filter() а затем .count():

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#61L])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#64L])
      +- *(1) Project
         +- *(1) Filter (isnotnull(ID#11) && (ID#11 = Muhammed MacIntyre))
            +- *(1) FileScan csv [ID#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [IsNotNull(ID), EqualTo(ID,Muhammed MacIntyre)], ReadSchema: struct<_c1:string>

Как правило, Spark при подсчете количества строк отображает строки с count = 1 и уменьшает все преобразователи для создания окончательного числа строк.

В случае 2 Spark сначала должен отфильтровать, а затем создать частичные значения для каждого раздела, а затем провести еще один этап для их суммирования. Таким образом, для тех же строк во втором случае Spark также выполняет фильтрацию, что влияет на время вычислений в больших наборах данных. Spark - это инфраструктура для распределенной обработки, которая не имеет индексов, подобных Pandas, которые могли бы выполнять фильтрацию очень быстро, не пропуская все строки.

Summary

В этом простом случае вы можете не делать много вещей, чтобы улучшить время выполнения. Вы можете попробовать свое приложение с различными настройками конфигурации (например, # spark. sql .shuffle.partitions, # spark.default.parallelism, # of executors, # executor memory et c)

0 голосов
/ 31 января 2020

Это потому, что искра лениво оценивается . Когда вы вызываете tmp.count (), это шаг действия. Другими словами, ваше время tmp.count также включает время фильтра. Если вы хотите действительно сравнить два числа, попробуйте следующее:

%%time
df.count()

id0 = df.first().ID  ## First ID
tmp = df.filter( (df['ID'] == id0) )
tmp.persist().show()

%%time
tmp.count()

Важным компонентом здесь является tmp.persist (). Show () ДО , выполняющий подсчет. Это выполняет фильтр и кэширует результат. Таким образом, tmp.count () включает только фактическое время подсчета.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...