![Spark application stages](https://i.stack.imgur.com/lHUV7.png)
В моем приложении Spark одна и та же задача выполняется в несколько этапов.Но эти утверждения были определены только один раз в коде.Более того, для выполнения одних и тех же задач на разных этапах требуется разное время.Я понимаю, что в случае потери СДР, родословная задачи используется для пересчета СДР.Как я могу узнать, так ли это, потому что одно и то же явление наблюдалось во всех прогонах этого приложения.Может кто-нибудь объяснить, что здесь происходит и при каких условиях задача может быть запланирована в несколько этапов.
Код очень похож на следующий:
val events = getEventsDF()
events.cache()
metricCounter.inc("scec", events.count())
val scEvents = events.filter(_.totalChunks == 1)
.repartition(NUM_PARTITIONS, lit(col("eventId")))
val sortedEvents = events.filter(e => e.totalChunks > 1 && e.totalChunks <= maxNumberOfChunks)
.map(PartitionUtil.createKeyValueTuple)
.rdd
.repartitionAndSortWithinPartitions(new EventDataPartitioner(NUM_PARTITIONS))
val largeEvents = events.filter(_.totalChunks > maxNumberOfChunks).count()
val mcEvents = sortedEvents.mapPartitionsWithIndex[CFEventLog](
(index: Int, iter: Iterator[Tuple2]) => doSomething())
val mcEventsDF = session.sqlContext.createDataset[CFEventLog](mcEvents)
metricCounter.inc("mcec", mcEventsDF.count())
val currentDf = scEvents.unionByName(mcEventsDF)
val distinctDateHour = currentDf.select(col("eventDate"), col("eventHour"))
.distinct
.collect
val prevEventsDF = getAnotherDF(distinctDateHour)
val finalDf = currentDf.unionByName(prevEventsDF).dropDuplicates(Seq("eventId"))
finalDf
.write.mode(SaveMode.Overwrite)
.partitionBy("event_date", "event_hour")
.saveAsTable("table")
val finalEventsCount = finalDf.count()
Приводит ли каждое действие count()
к повторному выполнению преобразования RDD перед действием?
СпасибоDevj