org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание не сериализуемо - PullRequest
0 голосов
/ 06 сентября 2018

При выполнении примера Apache Spark появляется следующая ошибка

org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача не сериализуема: java.io.NotSerializableException: org.apache.spark.sql.TypedColumn

Ошибка при выполнении последней строки.

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator [Employee, Average, Double] {

    def zero: Average = Average(0L,0L)

    def reduce (buffer: Average, employee: Employee): Average = {

          buffer.sum += buffer.sum + employee.salary
          buffer.count += buffer.count + 1
          buffer
    }

    def merge (b1: Average, b2: Average): Average = {

      b1.sum = b1.sum + b2.sum
      b1.count = b1.count + b2.count
      b1  
    }

    def finish (reduction: Average) : Double = reduction.sum/reduction.count

    def bufferEncoder: Encoder[Average] = Encoders.product
    def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("FileStore/tables/employee.json").as[Employee] 
ds.show()

val averageSalary= MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
...