войти от искры UDF к водителю - PullRequest
0 голосов
/ 28 августа 2018

У меня есть простой UDF в кирпичах данных, используемых в spark. Я не могу использовать println или log4j или что-то еще, потому что это будет выведено на исполнение, мне нужно это в драйвере. У меня очень настройки системного журнала

var logMessage = ""

def log(msg: String){
  logMessage += msg + "\n"
}

def writeLog(file: String){
  println("start write")
  println(logMessage)
  println("end write")
}

def warning(msg: String){
  log("*WARNING* " + msg)
}

val CleanText = (s: int) => {
  log("I am in this UDF")
  s+2
}

sqlContext.udf.register("CleanText", CleanText)

Как мне заставить это работать должным образом и войти в драйвер?

Ответы [ 2 ]

0 голосов
/ 28 августа 2018

Самый близкий механизм в Apache Spark к тому, что вы пытаетесь сделать, это аккумуляторы . Вы можете накапливать строки журнала на исполнителях и получать доступ к результату в драйвере:

// create a collection accumulator using the spark context:
val logLines: CollectionAccumulator[String] = sc.collectionAccumulator("log")

// log function adds a line to accumulator
def log(msg: String): Unit = logLines.add(msg)

// driver-side function can print the log using accumulator's *value*
def writeLog() {
  import scala.collection.JavaConverters._
  println("start write")
  logLines.value.asScala.foreach(println)
  println("end write")
}

val CleanText = udf((s: Int) => {
  log(s"I am in this UDF, got: $s")
  s+2
})

// use UDF in some transformation:
Seq(1, 2).toDF("a").select(CleanText($"a")).show()

writeLog()    
// prints: 
// start write
// I am in this UDF, got: 1
// I am in this UDF, got: 2
// end write

НО : это на самом деле не рекомендуется, особенно для целей регистрации. Если вы войдете в каждую запись, этот аккумулятор в конечном итоге вылетит вашего драйвера на OutOfMemoryError или просто ужасно замедлит вас.

Поскольку вы используете блоки данных, я бы проверил, какие опции они поддерживают для агрегации журналов, или просто использовал интерфейс Spark для просмотра журналов исполнителя.

0 голосов
/ 28 августа 2018

Вы не можете ... если вы не хотите сходить с ума и создать своего рода аппендиста для повторного входа, который отправляет журналы по сети или что-то в этом роде.

Код UDF будет запускаться на всех ваших исполнителях при оценке фрейма данных. Таким образом, у вас может быть 2000 хостов, на которых он работает, и каждый из них будет входить в свое собственное местоположение; Вот как работает Spark. Драйвер не тот, кто выполняет код, поэтому он не может быть зарегистрирован.

Вы можете использовать агрегат журналов YARN, чтобы получить все журналы от исполнителей, хотя для последующего анализа.

Возможно, вы могли бы также записать в поток kafka или что-то в этом роде что-нибудь творческое с некоторой работой и записать журналы непрерывно позже из потока.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...