Как передать функции в RDD.map? - PullRequest
2 голосов
/ 16 апреля 2019

Я боролся с org.apache.spark.SparkException: Task not serializable, но, наконец, понял, как сделать эту работу:

case class Article(id: Int, title: String, content: String) extends Serializable

 val index: RDD[(String, List[(Int, Int)])] = (for {
      article <- articlesRDD
      text = article.title + article.content
      word <- text.split(" ")
    } yield (word, (article.id, 1)))
      .groupByKey()
      .mapPartitions{
        _.map {
          case(k, v) => (k, v.groupBy(_._1).map(pair => (pair._1, pair._2.map(_._2).sum)).toList) // Works as expected
          //case(k, v) => (k, reducer(v.toList)) // Fails
        }
      }.cache()

А вот и reducer:

def reducer(list: List[(Int, Int)]): List[(Int, Int)] = {
    list.groupBy(_._1).map(
      pair => (pair._1, pair._2.map(_._2).sum)
    ).toList
  }

Я также пытался определить функцию reducer как val, но я получаю ту же ошибку. На самом деле, ошибка возникает в записной книжке Databricks, на моей машине с Spark в локальном режиме она работает нормально.

Почему закомментированный оператор case не работает? Должен ли я всегда передавать анонимные функции, даже если они не так тривиальны, как моя reducer функция?

Заранее спасибо :)

1 Ответ

2 голосов
/ 16 апреля 2019

Вы не говорите , где определено reducer, но, скорее всего, оно относится к несериализуемому классу (например, к классу, содержащему SparkContext и т. Д.). Затем его использование требует захвата экземпляра, на котором он вызывается. Вместо этого определите это в object.

Из Руководства по программированию Spark :

API Spark в значительной степени зависит от передачи функций в программе драйвера для запуска в кластере. Есть два рекомендуемых способа сделать это:

  • Синтаксис анонимной функции, который можно использовать для коротких фрагментов кода.
  • Статические методы в глобальном одноэлементном объекте. Например, вы можете определить object MyFunctions и затем передать MyFunctions.func1 следующим образом:

    object MyFunctions {
      def func1(s: String): String = { ... }
    }
    
    myRdd.map(MyFunctions.func1)
    

Обратите внимание, что хотя также можно передавать ссылку на метод в экземпляре класса (в отличие от одноэлементного объекта), для этого требуется отправить объект, содержащий этот класс, вместе с методом.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...