Проверки набора данных Spark Scala с использованием UDF и его производительности - PullRequest
0 голосов
/ 03 декабря 2018

Я новичок в Spark Scala.Я реализовал решение для проверки набора данных для нескольких столбцов с использованием UDF, а не прохождения отдельных столбцов в цикле for.Но я не знаю, как это работает быстрее, и я должен объяснить, что это было лучшее решение.

Столбцы для проверки данных будут получены во время выполнения, поэтому мы не можем жестко закодировать имена столбцов в коде.А также столбец комментариев должен быть обновлен с именем столбца, когда значение столбца не удалось в ходе проверки.

Старый код,

def doValidate(data: Dataset[Row], columnArray: Array[String], validValueArrays: Array[String]): Dataset[Row] = {
var ValidDF: Dataset[Row] = data
var i:Int = 0
for (s <- columnArray) {
        var list = validValueArrays(i).split(",")
    ValidDF = ValidDF.withColumn("comments",when(ValidDF.col(s).isin(list: _*),concat(lit(col("comments")),lit(" Error: Invalid Records in: ") ,lit(s))).otherwise(col("comments")))
    i = i + 1  
}  

return ValidDF;

}

Новый код,

def validateColumnValues(data: Dataset[Row], columnArray: Array[String], validValueArrays: Array[String]): Dataset[Row] = {
 var ValidDF: Dataset[Row] = data
 var checkValues = udf((row: Row, comment: String) => {
  var newComment = comment
  for (s: Int  <- 0 to row.length-1) {
    var value = row.get(s)
    var list = validValueArrays(s).split(",")

     if(!list.contains(value))
      {

       newComment = newComment + " Error:Invalid Records in: " + columnArray(s) +";"
      }
    }
     newComment
  });
ValidDF = ValidDF.withColumn("comments",checkValues(struct(columnArray.head, columnArray.tail: _*),col("comments")))

return ValidDF;
} 

columnArray -> будет иметь список столбцов

validValueArrays -> будет иметь допустимые значения, соответствующие позиции массива столбцов.Несколько допустимых значений будут разделены.

Я хочу знать, какой из них лучше или какой-либо другой лучший подход для этого.Когда я тестировал новый код, он выглядит лучше.А также в чем разница между этими двумя логиками, когда я читаю UDF, это черный ящик для Spark.И в этом случае UDF повлияет на производительность в любом случае?

1 Ответ

0 голосов
/ 03 декабря 2018

Мне нужно исправить некоторые закрытые скобки перед запуском.Один '}' будет удален при возврате validDF.Я все еще получаю ошибку анализа времени выполнения.

Лучше избегать UDF, так как UDF подразумевает десериализацию для обработки данных в классическом Scala, а затем для их повторной сериализации.Однако, если ваше требование не может быть заархивировано с использованием функции сборки SQL, вам нужно обратиться к UDF, но вы должны убедиться, что вы просматриваете SparkUI на предмет производительности и плана выполнения.

...