Ваш вопрос очень сложный и хитрый ..
Я протестировал большой набор данных, чтобы воспроизвести ваше поведение.
Описание проблемы
Я протестировал следующие два случаи в большом наборе данных:
# Case 1
df.count() # Execution time: 37secs
# Case 2
df.filter((df['ID'] == id0)).count() #Execution time: 1.39 min
Объяснение
Позволяет увидеть физический план только с .count()
:
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#38L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#41L])
+- *(1) FileScan csv [] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>
Позволяет увидеть физический план с .filter()
а затем .count()
:
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#61L])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#64L])
+- *(1) Project
+- *(1) Filter (isnotnull(ID#11) && (ID#11 = Muhammed MacIntyre))
+- *(1) FileScan csv [ID#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:...], PartitionFilters: [], PushedFilters: [IsNotNull(ID), EqualTo(ID,Muhammed MacIntyre)], ReadSchema: struct<_c1:string>
Как правило, Spark при подсчете количества строк отображает строки с count = 1 и уменьшает все преобразователи для создания окончательного числа строк.
В случае 2 Spark сначала должен отфильтровать, а затем создать частичные значения для каждого раздела, а затем провести еще один этап для их суммирования. Таким образом, для тех же строк во втором случае Spark также выполняет фильтрацию, что влияет на время вычислений в больших наборах данных. Spark - это инфраструктура для распределенной обработки, которая не имеет индексов, подобных Pandas, которые могли бы выполнять фильтрацию очень быстро, не пропуская все строки.
Summary
В этом простом случае вы можете не делать много вещей, чтобы улучшить время выполнения. Вы можете попробовать свое приложение с различными настройками конфигурации (например, # spark. sql .shuffle.partitions, # spark.default.parallelism
, # of executors
, # executor memory
et c)