Ниже приведено простое приложение Spark для подсчета слов с использованием DataFrame и соответствующие модульные тесты с использованием spark-testingbase. Это работает, если я использую следующее
def toWords(linesDf: DataFrame) = {
linesDf
.select(linesDf("line"),
explode(split(linesDf("line"), WhitespaceRegex)).as("word"))
}
, но не работает, если я использую $
вызов метода для ссылки на столбцы, как показано ниже
def toWords(linesDf: DataFrame) = {
import spark.implicits._
linesDf
.select($"line"),
explode(split($"line", WhitespaceRegex)).as("word"))
}
Ошибка
java.lang.NullPointerException was thrown.
java.lang.NullPointerException
at com.aravind.oss.eg.wordcount.spark.WordCountDFApp$.toWords(WordCountDFApp.scala:42)
at com.aravind.oss.eg.wordcount.spark.WordCountDFAppTestSpec2$$anonfun$1.apply$mcV$sp(WordCountDFAppTestSpec2.scala:32)
at com.aravind.oss.eg.wordcount.spark.WordCountDFAppTestSpec2$$anonfun$1.apply(WordCountDFAppTestSpec2.scala:17)
at com.aravind.oss.eg.wordcount.spark.WordCountDFAppTestSpec2$$anonfun$1.apply(WordCountDFAppTestSpec2.scala:17)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
Приложение Spark
object WordCountDFApp extends App with Logging {
logInfo("WordCount with Dataframe API")
val paths = getPaths(args)
val cluster = getClusterCfg(args)
if (paths.size > 1) {
logInfo("More than one file to process")
}
logInfo("Path(s): " + paths)
logInfo("Cluster: " + cluster)
val spark = getSparkSession("WordCountDFApp", cluster)
val linesDf: DataFrame = spark.read
.textFile(paths: _*)
.toDF("line") //Dataset[Row]
logInfo("DataFrame before splitting line")
linesDf.show(false)
import spark.implicits._
import org.apache.spark.sql.functions._
val wordsDf = toWords(linesDf)
logInfo("Inferred schema")
wordsDf.printSchema()
logInfo("DataFrame after splitting the line into words")
wordsDf.show(false)
countWords(wordsDf).show(false)
def toWords(linesDf: DataFrame) = {
linesDf
.select(linesDf("line"),
explode(split(linesDf("line"), WhitespaceRegex)).as("word"))
}
}
Тест
class WordCountDFAppTestSpec2 extends FlatSpec with DataFrameSuiteBase {
val input: Seq[String] = Seq(
("one"),
("two"),
(""),
("three Three")
)
"toWords" should "split the file into words" in {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val sourceDf = input.toDF("line")
// sourceDf.show(false)
val expectedDF = Seq(
("one", "one"),
("two", "two"),
("", ""),
("three Three", "three"),
("three Three", "Three")
).toDF("line", "word")
// expectedDF.show(false)
val actualDF = WordCountDFApp.toWords(sourceDf)
// actualDF.show(false)
assertDataFrameEquals(actualDF, expectedDF)
}
}