Я попытался использовать groupBy на моем фрейме данных после добавления нового столбца, но столкнулся с проблемой Task NotSerializable - PullRequest
1 голос
/ 11 апреля 2020

Это мой код, я получаю ошибку Task Not Serializable, когда я делаю это result.groupBy("value")

object Test extends  App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[4]")
    .appName("https://SparkByExamples.com")
    .getOrCreate()
  import spark.implicits._

  def myUDF = udf { (v: Double) =>
    if (v < 0) 100
    else 500
  }

  val central: DataFrame = Seq((1, 2014),(2, 2018)).toDF("key", "year1")

  val other1: DataFrame = Seq((1, 2016),(2, 2015)).toDF("key", "year2")

  val result = central.join(other1, Seq("key"))
    .withColumn("value", myUDF(col("year2")))
  result.show()

  val result2 = result.groupBy("value")
  .count()
 result2.show()
}

1 Ответ

1 голос
/ 11 апреля 2020

Я запустил тот же код, что и у меня нет задачи, не сериализуемой. В некоторых случаях вы заблуждаетесь.

import org.apache.log4j.Level
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object Test extends  App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)
  val spark: SparkSession = SparkSession.builder()
    .master("local[4]")
    .appName("https://SparkByExamples.com")
    .getOrCreate()
  import spark.implicits._

  def myUDF = udf { (v: Double) =>
    if (v < 0) 100
    else 500
  }

  val central: DataFrame = Seq((1, 2014),(2, 2018)).toDF("key", "year1")

  val other1: DataFrame = Seq((1, 2016),(2, 2015)).toDF("key", "year2")

  val result = central.join(other1, Seq("key"))
    .withColumn("value", myUDF(col("year2")))
  result.show()

  val result2 = result.groupBy("value")
    .count()
  result2.show()
}

Результат:

+---+-----+-----+-----+
|key|year1|year2|value|
+---+-----+-----+-----+
|  1| 2014| 2016|  500|
|  2| 2018| 2015|  500|
+---+-----+-----+-----+

+-----+-----+
|value|count|
+-----+-----+
|  500|    2|
+-----+-----+

Вывод:

Подобные ситуации могут возникнуть, когда ваша версия spark не совместима с вашей Scala version.

отметьте это https://mvnrepository.com/artifact/org.apache.spark/spark-core для всех версий и соответствующих scala версий, которые вам необходимо использовать.

enter image description here

...