Считать записи в Spark с потоками - PullRequest
0 голосов
/ 31 мая 2019

Я использую onTaskEnd Spark listener, чтобы получить количество записей, записанных в файл, например:

import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWritten: Long = 0L

val rowCountListener: SparkListener = new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWritten += taskEnd.taskMetrics.outputMetrics.recordsWritten
    }
  }
}

def rowCountOf(proc: => Unit): Long = {
  recordsWritten = 0L
  spark.sparkContext.addSparkListener(rowCountListener)
  try {
    proc
  } finally {
    spark.sparkContext.removeSparkListener(rowCountListener)
  }
  recordsWritten
}

val rc = rowCountOf { (1 to 100).toDF.write.csv(s"test.csv") }
println(rc)

=> 100

Однако, пытаясь запустить несколько действий в потоках, это явно обрывается:

Seq(1, 2, 3).par.foreach { i =>
  val rc = rowCountOf { (1 to 100).toDF.write.csv(s"test${i}.csv") }
  println(rc)
}

=> 600
=> 700
=> 750

Я могу сделать так, чтобы каждый поток объявлял свою собственную переменную, но контекст spark по-прежнему используется совместно, и я не могу определить, к какому потоку относится конкретное событие SparkListenerTaskEnd. Есть ли способ заставить его работать?

(Правильно, может быть, я мог бы просто сделать это отдельными заданиями-искрами. Но это всего лишь одна часть программы, поэтому для простоты я бы предпочел остаться с потоками. В худшем случае я просто выполню это серийно или забудь про подсчет записей ...)

1 Ответ

0 голосов
/ 31 мая 2019

Немного хакерский, но вы можете использовать аккумуляторы как побочный эффект фильтрации

val acc = spark.sparkContext.longAccumulator("write count")
df.filter { _ =>
  acc.add(1)
  true
}.write.csv(...)
println(s"rows written ${acc.count}")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...