Я боролся с org.apache.spark.SparkException: Task not serializable
, но, наконец, понял, как сделать эту работу:
case class Article(id: Int, title: String, content: String) extends Serializable
val index: RDD[(String, List[(Int, Int)])] = (for {
article <- articlesRDD
text = article.title + article.content
word <- text.split(" ")
} yield (word, (article.id, 1)))
.groupByKey()
.mapPartitions{
_.map {
case(k, v) => (k, v.groupBy(_._1).map(pair => (pair._1, pair._2.map(_._2).sum)).toList) // Works as expected
//case(k, v) => (k, reducer(v.toList)) // Fails
}
}.cache()
А вот и reducer
:
def reducer(list: List[(Int, Int)]): List[(Int, Int)] = {
list.groupBy(_._1).map(
pair => (pair._1, pair._2.map(_._2).sum)
).toList
}
Я также пытался определить функцию reducer
как val
, но я получаю ту же ошибку. На самом деле, ошибка возникает в записной книжке Databricks, на моей машине с Spark в локальном режиме она работает нормально.
Почему закомментированный оператор case
не работает?
Должен ли я всегда передавать анонимные функции, даже если они не так тривиальны, как моя reducer
функция?
Заранее спасибо :)