«NotSerializableException» в scala функции карты - PullRequest
1 голос
/ 12 февраля 2020

Я читаю файл и пытаюсь отобразить значения с помощью функции. Но он выдает ошибку NotSerializableException Ниже приведен код, который я запускаю:

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min

/** Find the minimum temperature by weather station */
object MinTemperatures {

  def parseLine(line: String) = {
    val fields = line.split(",")
    val stationID = fields(0)
    val entryType = fields(2)
    val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
    (stationID, entryType, temperature)
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "MinTemperatures")

    // Read each line of input data
    val lines = sc.textFile("../DataSet/1800.csv")

    // Convert to (stationID, entryType, temperature) tuples
    val parsedLines = lines.map(parseLine)
}
}

Когда я запускаю приведенный выше код, он выдает ошибку ниже:

Использование профиля Spark по умолчанию для log4j: org / apache / spark / log4j-defaults.properties Исключение в потоке "main" org. apache .spark.SparkException: задача не сериализуется в org. apache .spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner. scala: 403) в орг. apache .spark.util.ClosureCleaner $ .clean (ClosureCleaner. scala: 393) в орг. apache .spark.util.ClosureCleaner $. clean (ClosureCleaner. scala: 162) в орг. apache .spark.SparkContext.clean (SparkContext. scala: 2326) в орг. apache .spark.rdd.RDD. $ anonfun $ map $ 1 ( СДР. scala: 371) на орг. apache .spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope. scala: 151) на орг. apache .spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope. scala: 112) в орг. apache .spark.rdd.RDD.withScope (RDD. scala: 363) в орг. apache .spark.rdd.RDD.map (RDD. scala: 370) на com.sundogsoftware.sp ark.MinTemperas $ .main (MinTemperas. scala: 32) на com.sundogsoftware.spark.MinTemperas.main (MinTemperas. scala)

Причина: java .io.NotSerializableException:

com.sundogsoftware.spark.MinTemperas $ Стек сериализации: - объект не сериализуем (класс: com.sundogsoftware.spark.MinTemperas $, значение: com.sundogsoftware.spark. MinTemperatures$@41fed14f) - элемент массива (индекс : 0) - массив (класс [L java .lang.Object ;, размер 1) - поле (класс: java .lang.invoke.SerializedLambda, имя: capturedArgs, тип: класс [L java .lang .Object;) - объект (класс java .lang.invoke.SerializedLambda, SerializedLambda [capturingClass = класс com.sundogsoftware.spark.MinTemperas $, functionsInterfaceMethod = scala / Function1.apply: (Ljava / lang / Object;) Ljava / lang / Object ;, реализация = invokeStatic com / sundogsoftware / spark / MinTemperas $. $ anonfun $ main $ 1: (Lcom / sundogsoftware / spark / MinTemperas $; Ljava / lang / String;) Lscala / Tuple3 ;, instantiatedMethodType = (Ljava / языки / String;) Lscala / Тот ple3 ;, numCaptured = 1]) - writeReplace data (класс: java .lang.invoke.SerializedLambda)

Но когда я запускаю тот же код с анонимной функцией, он успешно выполняется:

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min

/** Find the minimum temperature by weather station */
object MinTemperatures {

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "MinTemperatures")

    // Read each line of input data
    val lines = sc.textFile("../DataSet/1800.csv")

    // Convert to (stationID, entryType, temperature) tuples
    val parsedLines = lines.map(x => {
      val fields = x.split(",");
      val stationID = fields(0);
      val entryType = fields(2);
      val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f;
      (stationID, entryType, temperature)
    })

    // Filter out all but TMIN entries
    val minTemps = parsedLines.filter(x => x._2 == "TMIN")

    // Convert to (stationID, temperature)
    val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))

    // Reduce by stationID retaining the minimum temperature found
    val minTempsByStation = stationTemps.reduceByKey((x, y) => min(x, y))

    // Collect, format, and print the results
    val results = minTempsByStation.collect()

    for (result <- results.sorted) {
      val station = result._1
      val temp = result._2
      val formattedTemp = f"$temp%.2f F"
      println(s"$station minimum temperature: $formattedTemp")
    }

  }
}

Вывод:

EZE00100082 minimum temperature: 7.70 F
ITE00100554 minimum temperature: 5.36 F

Как вы видели выше, когда я использую именованную функцию (parseLine) внутри карты, она выдает ошибку, но та же программа вместо именованной функции, когда я использовал анонимную функцию в карте, она успешно работает .

Я просмотрел несколько блогов, но не получил причину ошибки. Может ли кто-нибудь помочь мне понять это?

1 Ответ

1 голос
/ 17 февраля 2020

Эта проблема, по-видимому, не связана с sbt или зависимостями, как я проверял, это происходит, когда сценарий не определен как объект (Scala объекты по умолчанию сериализуемы), поэтому эта ошибка означает, что сценарий не сериализации. Я создал новый объект и вставил тот же код. Это сработало.

...