NullPointerException при обращении к именам столбцов DataFrame с помощью вызова метода $ - PullRequest
0 голосов
/ 12 апреля 2020

Ниже приведено простое приложение Spark для подсчета слов с использованием DataFrame и соответствующие модульные тесты с использованием spark-testingbase. Это работает, если я использую следующее

def toWords(linesDf: DataFrame) = {
        linesDf
          .select(linesDf("line"),
            explode(split(linesDf("line"), WhitespaceRegex)).as("word"))
}

, но не работает, если я использую $ вызов метода для ссылки на столбцы, как показано ниже

def toWords(linesDf: DataFrame) = {
    import spark.implicits._
    linesDf
        .select($"line"),
            explode(split($"line", WhitespaceRegex)).as("word"))
}

Ошибка

java.lang.NullPointerException was thrown.
java.lang.NullPointerException
    at com.aravind.oss.eg.wordcount.spark.WordCountDFApp$.toWords(WordCountDFApp.scala:42)
    at com.aravind.oss.eg.wordcount.spark.WordCountDFAppTestSpec2$$anonfun$1.apply$mcV$sp(WordCountDFAppTestSpec2.scala:32)
    at com.aravind.oss.eg.wordcount.spark.WordCountDFAppTestSpec2$$anonfun$1.apply(WordCountDFAppTestSpec2.scala:17)
    at com.aravind.oss.eg.wordcount.spark.WordCountDFAppTestSpec2$$anonfun$1.apply(WordCountDFAppTestSpec2.scala:17)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)

Приложение Spark

object WordCountDFApp extends App with Logging {
  logInfo("WordCount with Dataframe API")

  val paths = getPaths(args)
  val cluster = getClusterCfg(args)

  if (paths.size > 1) {
    logInfo("More than one file to process")
  }
  logInfo("Path(s): " + paths)
  logInfo("Cluster: " + cluster)

  val spark = getSparkSession("WordCountDFApp", cluster)

  val linesDf: DataFrame = spark.read
    .textFile(paths: _*)
    .toDF("line") //Dataset[Row]
  logInfo("DataFrame before splitting line")
  linesDf.show(false)

  import spark.implicits._
  import org.apache.spark.sql.functions._

  val wordsDf = toWords(linesDf)

  logInfo("Inferred schema")
  wordsDf.printSchema()

  logInfo("DataFrame after splitting the line into words")
  wordsDf.show(false)

  countWords(wordsDf).show(false)

  def toWords(linesDf: DataFrame) = {
    linesDf
      .select(linesDf("line"),
        explode(split(linesDf("line"), WhitespaceRegex)).as("word"))
  }

}

Тест

class WordCountDFAppTestSpec2 extends FlatSpec with DataFrameSuiteBase {

  val input: Seq[String] = Seq(
    ("one"),
    ("two"),
    (""),
    ("three Three")
  )

  "toWords" should "split the file into words" in {
    val sqlCtx = sqlContext
    import sqlCtx.implicits._
    val sourceDf = input.toDF("line")
    // sourceDf.show(false)

    val expectedDF = Seq(
      ("one", "one"),
      ("two", "two"),
      ("", ""),
      ("three Three", "three"),
      ("three Three", "Three")
    ).toDF("line", "word")
    // expectedDF.show(false)

    val actualDF = WordCountDFApp.toWords(sourceDf)
    // actualDF.show(false)

    assertDataFrameEquals(actualDF, expectedDF)
  }
}

Ответы [ 2 ]

1 голос
/ 12 апреля 2020

Основная проблема заключается в том, что импликации не импортируются во время выполнения, вам необходимо добавить следующую строку:

import linesDf.sparkSession.implicits._

в вашем методе, например:

  def toWords(linesDf: DataFrame) = {
import linesDf.sparkSession.implicits._
linesDf
  .select($"line",
    explode(split(linesDf("line"), WhitespaceRegex)).as("word"))

}

и это решит проблему.

1 голос
/ 12 апреля 2020

Вы должны позвонить / импортировать sqlContext.implicits для доступа к $ (знак доллара) в вашем коде

import spark.sqlContext.implicits._

Таким образом, ваш полный импорт выглядит так:

import spark.implicits._
import spark.sqlContext.implicits._
import org.apache.spark.sql.functions._
...