Пример запуска Spark Aggregator - PullRequest
0 голосов
/ 09 июля 2020

Я пытаюсь запустить пример из документации Spark 2.4.3, найденной здесь в кластере Databricks.

Я добавил недостающие методы, и теперь код выглядит следующим образом:

case class Data(i: Int)

val customSummer =  new Aggregator[Data, Int, Int] {
 def zero: Int = 0
 def reduce(b: Int, a: Data): Int = b + a.i
 def merge(b1: Int, b2: Int): Int = b1 + b2
 def finish(r: Int): Int = r
 def bufferEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
 def outputEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
}.toColumn

val ds = Seq(Data(1)).toDS
val aggregated = ds.select(customSummer).collect

Я получаю следующую ошибку: org.apache.spark.SparkException: Task not serializable

Я нашел это в трассировке стека: Caused by: java.io.NotSerializableException: org.apache.spark.sql.TypedColumn

Вот полный стек trace .

Вопрос в том, удавалось ли кому-нибудь запустить подобный код? Если да, не могли бы вы указать мне ресурсы, где я могу узнать, что мне не хватает?

Спасибо.

1 Ответ

0 голосов
/ 09 июля 2020

Поместите класс case вне тестового класса и сделайте внешний класс имеющим Aggregator Serializable.

class Test extends Serializable {
 @Test
  def test62805430(): Unit = {

    val customSummer =  new Aggregator[Data, Int, Int] {
      def zero: Int = 0
      def reduce(b: Int, a: Data): Int = b + a.i
      def merge(b1: Int, b2: Int): Int = b1 + b2
      def finish(r: Int): Int = r
      def bufferEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
      def outputEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
    }.toColumn

    val ds = Seq(Data(1)).toDS
    val aggregated = ds.select(customSummer).collect
    println(aggregated.mkString(",")) // 1
  }

}

case class Data(i: Int)
...