Счетчик искр против длины и длины - PullRequest
0 голосов
/ 18 февраля 2019

Я использую com.datastax.spark:spark-cassandra-connector_2.11:2.4.0, когда запускаю ноутбуки zeppelin и не понимаю разницу между двумя операциями в spark.Одна операция занимает много времени для вычислений, вторая выполняется немедленно.Может ли кто-нибудь объяснить мне разницу между двумя операциями:

import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

case class SomeClass(val someField:String)

val timelineItems = spark.read.format("org.apache.spark.sql.cassandra").options(scala.collection.immutable.Map("spark.cassandra.connection.host" -> "127.0.0.1", "table" -> "timeline_items", "keyspace" -> "timeline" )).load()
//some simplified code:
val timelineRow = timelineItems
        .map(x => {SomeClass("test")})
        .filter(x => x != null)
        .toDF()
        .limit(4)

//first operation (takes a lot of time. It seems spark iterates through all items in Cassandra and doesn't use laziness with limit 4)
println(timelineRow.count()) //return: 4

//second operation (executes immediately); 300 - just random number which doesn't affect the result
println(timelineRow.take(300).length) //return: 4

1 Ответ

0 голосов
/ 18 февраля 2019

То, что вы видите - это разница между реализацией Limit (операция, подобная преобразованию) и CollectLimit (операция, подобная действию).Однако разница во времени сильно вводит в заблуждение, и в общем случае это не то, чего вы можете ожидать.

Сначала давайте создадим MCVE

spark.conf.set("spark.sql.files.maxPartitionBytes", 500)

val ds = spark.read
  .text("README.md")
  .as[String]
  .map{ x => {
    Thread.sleep(1000)
    x
   }}

val dsLimit4 = ds.limit(4)

. Убедитесь, что мы начинаем с чистого листа:

spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
Boolean = true

вызовите count:

dsLimit4.count()

и посмотрите на план выполнения (из Spark UI):

== Parsed Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
   +- LocalLimit 4
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
         +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
            +- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
               +- Relation[value#0] text

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
   +- LocalLimit 4
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
         +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
            +- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
               +- Relation[value#0] text

== Optimized Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
   +- LocalLimit 4
      +- Project
         +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
            +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
               +- DeserializeToObject value#0.toString, obj#5: java.lang.String
                  +- Relation[value#0] text

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#12L])
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#15L])
   +- *(2) GlobalLimit 4
      +- Exchange SinglePartition
         +- *(1) LocalLimit 4
            +- *(1) Project
               +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
                  +- *(1) MapElements <function1>, obj#6: java.lang.String
                     +- *(1) DeserializeToObject value#0.toString, obj#5: java.lang.String
                        +- *(1) FileScan text [value#0] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/path/to/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>

Основной компонент

+- *(2) GlobalLimit 4
   +- Exchange SinglePartition
      +- *(1) LocalLimit 4

, что означает, что мы можем ожидать широкую операцию с несколькими этапами.Мы можем видеть одну работу

spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(0)

с двумя этапами

spark.sparkContext.statusTracker.getJobInfo(0).get.stageIds
Array[Int] = Array(0, 1)

с восемью

spark.sparkContext.statusTracker.getStageInfo(0).get.numTasks
Int = 8

и одним *Задача 1034 *

spark.sparkContext.statusTracker.getStageInfo(1).get.numTasks
Int = 1

соответственно.

Теперь давайте сравним ее с

dsLimit4.take(300).size

, которая генерирует следующие

== Parsed Logical Plan ==
GlobalLimit 300
+- LocalLimit 300
   +- GlobalLimit 4
      +- LocalLimit 4
         +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
            +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
               +- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
                  +- Relation[value#0] text

== Analyzed Logical Plan ==
value: string
GlobalLimit 300
+- LocalLimit 300
   +- GlobalLimit 4
      +- LocalLimit 4
         +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
            +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
               +- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
                  +- Relation[value#0] text

== Optimized Logical Plan ==
GlobalLimit 4
+- LocalLimit 4
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
      +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
         +- DeserializeToObject value#0.toString, obj#5: java.lang.String
            +- Relation[value#0] text

== Physical Plan ==
CollectLimit 4
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
   +- *(1) MapElements <function1>, obj#6: java.lang.String
      +- *(1) DeserializeToObject value#0.toString, obj#5: java.lang.String
         +- *(1) FileScan text [value#0] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/path/to/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>

В то время как глобальные илокальные границы все еще встречаются, в середине обмен отсутствует.Поэтому можно ожидать одноступенчатой ​​операции.Обратите внимание, что планировщик сузил предел до более ограничительного значения.

Как и ожидалось, мы видим одну новую работу:

spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(1, 0)

, которая генерирует только одну стадию:

spark.sparkContext.statusTracker.getJobInfo(1).get.stageIds
Array[Int] = Array(2)

только с одной задачей

spark.sparkContext.statusTracker.getStageInfo(2).get.numTasks
Int = 1

Что это значит для нас?

  • В countcase Spark использовал широкое преобразование и фактически применяет LocalLimit к каждому разделу и перетасовывает частичные результаты для выполнения GlobalLimit.
  • В случае take Spark использовал узкое преобразование и оценивал LocalLimit только на первом разделе.

Очевидно, что последний подход не будет работать с числом значений вПервый раздел ниже запрошенного лимита.

val dsLimit105 = ds.limit(105) // There are 105 lines

В этом случае первый count будет использовать ту же логику, что и раньше (я призываю вас подтвердить это эмпирически), но take выберет довольно другой путь.До сих пор мы запустили только два задания:

spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(1, 0)

Теперь, если мы выполним

dsLimit105.take(300).size

, вы увидите, что для этого потребовалось еще 3 задания:

spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(4, 3, 2, 1, 0)

Так что здесь происходит?Как отмечалось ранее, оценки одного раздела недостаточно для удовлетворения ограничения в общем случае.В таком случае Spark итеративно оценивает LocalLimit на разделах до тех пор, пока не будет выполнено GlobalLimit, увеличивая количество разделов, взятых на каждой итерации.

Такая стратегия может иметь существенное влияние на производительность.Сам по себе запуск заданий Spark обходится недешево, и в тех случаях, когда вышестоящий объект является результатом широкого преобразования, вещи могут стать довольно уродливыми (в лучшем случае вы можете читать случайные файлы, но если они по какой-то причине потеряны, Spark может быть вынуждендля повторного выполнения всех зависимостей).

Подводя итог :

  • take является действием, которое может привести к короткому замыканию в определенных случаях, когда в восходящем процессеявляется узким, и LocalLimits может быть удовлетворено GlobalLimits с использованием первых нескольких разделов.
  • limit является преобразованием и всегда оценивает все LocalLimits, так как нет итеративного escape-люка.

В то время как один может вести себя лучше, чем другой в определенных случаях, он не подлежит обмену и не гарантирует лучшую производительность в целом.

...