То, что вы видите - это разница между реализацией Limit
(операция, подобная преобразованию) и CollectLimit
(операция, подобная действию).Однако разница во времени сильно вводит в заблуждение, и в общем случае это не то, чего вы можете ожидать.
Сначала давайте создадим MCVE
spark.conf.set("spark.sql.files.maxPartitionBytes", 500)
val ds = spark.read
.text("README.md")
.as[String]
.map{ x => {
Thread.sleep(1000)
x
}}
val dsLimit4 = ds.limit(4)
. Убедитесь, что мы начинаем с чистого листа:
spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
Boolean = true
вызовите count
:
dsLimit4.count()
и посмотрите на план выполнения (из Spark UI):
== Parsed Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Optimized Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- Project
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject value#0.toString, obj#5: java.lang.String
+- Relation[value#0] text
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#12L])
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#15L])
+- *(2) GlobalLimit 4
+- Exchange SinglePartition
+- *(1) LocalLimit 4
+- *(1) Project
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- *(1) MapElements <function1>, obj#6: java.lang.String
+- *(1) DeserializeToObject value#0.toString, obj#5: java.lang.String
+- *(1) FileScan text [value#0] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/path/to/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
Основной компонент
+- *(2) GlobalLimit 4
+- Exchange SinglePartition
+- *(1) LocalLimit 4
, что означает, что мы можем ожидать широкую операцию с несколькими этапами.Мы можем видеть одну работу
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(0)
с двумя этапами
spark.sparkContext.statusTracker.getJobInfo(0).get.stageIds
Array[Int] = Array(0, 1)
с восемью
spark.sparkContext.statusTracker.getStageInfo(0).get.numTasks
Int = 8
и одним *Задача 1034 *
spark.sparkContext.statusTracker.getStageInfo(1).get.numTasks
Int = 1
соответственно.
Теперь давайте сравним ее с
dsLimit4.take(300).size
, которая генерирует следующие
== Parsed Logical Plan ==
GlobalLimit 300
+- LocalLimit 300
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Analyzed Logical Plan ==
value: string
GlobalLimit 300
+- LocalLimit 300
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Optimized Logical Plan ==
GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject value#0.toString, obj#5: java.lang.String
+- Relation[value#0] text
== Physical Plan ==
CollectLimit 4
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- *(1) MapElements <function1>, obj#6: java.lang.String
+- *(1) DeserializeToObject value#0.toString, obj#5: java.lang.String
+- *(1) FileScan text [value#0] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/path/to/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
В то время как глобальные илокальные границы все еще встречаются, в середине обмен отсутствует.Поэтому можно ожидать одноступенчатой операции.Обратите внимание, что планировщик сузил предел до более ограничительного значения.
Как и ожидалось, мы видим одну новую работу:
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(1, 0)
, которая генерирует только одну стадию:
spark.sparkContext.statusTracker.getJobInfo(1).get.stageIds
Array[Int] = Array(2)
только с одной задачей
spark.sparkContext.statusTracker.getStageInfo(2).get.numTasks
Int = 1
Что это значит для нас?
- В
count
case Spark использовал широкое преобразование и фактически применяет LocalLimit
к каждому разделу и перетасовывает частичные результаты для выполнения GlobalLimit
. - В случае
take
Spark использовал узкое преобразование и оценивал LocalLimit
только на первом разделе.
Очевидно, что последний подход не будет работать с числом значений вПервый раздел ниже запрошенного лимита.
val dsLimit105 = ds.limit(105) // There are 105 lines
В этом случае первый count
будет использовать ту же логику, что и раньше (я призываю вас подтвердить это эмпирически), но take
выберет довольно другой путь.До сих пор мы запустили только два задания:
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(1, 0)
Теперь, если мы выполним
dsLimit105.take(300).size
, вы увидите, что для этого потребовалось еще 3 задания:
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(4, 3, 2, 1, 0)
Так что здесь происходит?Как отмечалось ранее, оценки одного раздела недостаточно для удовлетворения ограничения в общем случае.В таком случае Spark итеративно оценивает LocalLimit
на разделах до тех пор, пока не будет выполнено GlobalLimit
, увеличивая количество разделов, взятых на каждой итерации.
Такая стратегия может иметь существенное влияние на производительность.Сам по себе запуск заданий Spark обходится недешево, и в тех случаях, когда вышестоящий объект является результатом широкого преобразования, вещи могут стать довольно уродливыми (в лучшем случае вы можете читать случайные файлы, но если они по какой-то причине потеряны, Spark может быть вынуждендля повторного выполнения всех зависимостей).
Подводя итог :
take
является действием, которое может привести к короткому замыканию в определенных случаях, когда в восходящем процессеявляется узким, и LocalLimits
может быть удовлетворено GlobalLimits
с использованием первых нескольких разделов. limit
является преобразованием и всегда оценивает все LocalLimits
, так как нет итеративного escape-люка.
В то время как один может вести себя лучше, чем другой в определенных случаях, он не подлежит обмену и не гарантирует лучшую производительность в целом.