Я использую onTaskEnd
Spark listener, чтобы получить количество записей, записанных в файл, например:
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
var recordsWritten: Long = 0L
val rowCountListener: SparkListener = new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
recordsWritten += taskEnd.taskMetrics.outputMetrics.recordsWritten
}
}
}
def rowCountOf(proc: => Unit): Long = {
recordsWritten = 0L
spark.sparkContext.addSparkListener(rowCountListener)
try {
proc
} finally {
spark.sparkContext.removeSparkListener(rowCountListener)
}
recordsWritten
}
val rc = rowCountOf { (1 to 100).toDF.write.csv(s"test.csv") }
println(rc)
=> 100
Однако, пытаясь запустить несколько действий в потоках, это явно обрывается:
Seq(1, 2, 3).par.foreach { i =>
val rc = rowCountOf { (1 to 100).toDF.write.csv(s"test${i}.csv") }
println(rc)
}
=> 600
=> 700
=> 750
Я могу сделать так, чтобы каждый поток объявлял свою собственную переменную, но контекст spark по-прежнему используется совместно, и я не могу определить, к какому потоку относится конкретное событие SparkListenerTaskEnd
. Есть ли способ заставить его работать?
(Правильно, может быть, я мог бы просто сделать это отдельными заданиями-искрами. Но это всего лишь одна часть программы, поэтому для простоты я бы предпочел остаться с потоками. В худшем случае я просто выполню это серийно или забудь про подсчет записей ...)