Spark StructField.dataType создает исключение NullPointerException - PullRequest
0 голосов
/ 15 апреля 2020

Клиент. scala:

...
import Implicits._

val singersDF = Seq(
  ("beatles", "help|hey jude"),
  ("romeo", "eres mia")
).toDF("name", "hit_songs")

val actualDF = singersDF.withColumn(
  "hit_songs",
  split(col("hit_songs"), "\\|")
)

actualDF.foo()
...

Последствия. scala:

...
object Implicits {
  implicit class Implicits(df: DataFrame) extends java.io.Serializable {
    def foo () = {
      df.foreachPartition((partition: Iterator[Row]) => {
        partition.foreach((row: Row) => {
          val types = df.schema.fields.map(_.dataType)
...

Последняя строка бросков:

20/04/15 18:18:00 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
    at org.apache.spark.sql.Dataset.schema(Dataset.scala:465)

1 Ответ

1 голос
/ 15 апреля 2020

Попробуйте добавить schema val для вашего неявного класса:

implicit class Implicits(df: DataFrame) extends java.io.Serializable {
  val schema: StructType = df.schema
  def foo(): Unit = {
    df.foreachPartition {
      (partition: Iterator[Row]) =>
        partition.foreach {
          row: Row =>
            val types = schema.fields.map(_.dataType)
            // ...
        }
        partition
    }
  }
}

Я думаю, что NPE в вашем примере, потому что foreachPartition в DataFrame не может знать что-то о структуре DataFrame и этой абстракции «исчез» при запуске функции процесса передаются в foreach.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...