Я думаю, seqOp
- это не вложенная функция, а метод, привязанный к какому-то огромному объекту.Возможно, вы на самом деле пытаетесь отправить (acc, article) => this.seqOp(acc, article)
на рабочие узлы, где this
- это тяжелый объект, связанный с еще более тяжелым графом объектов, который находится на вашей главной JVM.Это заставляет ваш главный узел пытаться сериализовать все вещи, связанные с объектом, для которого определен метод, и со стороны это выглядит так, как будто вычисления даже не запускаются должным образом, потому что мастеру никогда не удается отправить весь объектГраф для рабочих узлов.
Когда вы используете синтаксис анонимной функции, он десагарсует примерно так:
rdd.aggregate(0)(
new Function2[Int, WikipediaArticle, Int] {
def apply(acc: Int, article: WikipediaArticle) =
(acc + (if (article.mentionsLanguage(lang)) 1 else 0))
},
_ + _
)
Здесь вы можете сразу увидеть, что экземпляр анонимного локального классарасширение от Function2
не имеет ссылок на какие-либо другие объекты.На самом деле, он даже не имеет переменных-членов, поэтому на самом деле нечего сериализовать (все, что вам нужно знать, это класс этой вещи; она не несет с собой никакой дополнительной информации).
Но когда вы определяете метод seqOp
для некоторых VeryLargeObject
class VeryLargeObject {
val referencesToMillionOtherObjects: Array[Any]
def seqOp(acc: Int, article: WikipediaArticle) = ...
}
, а затем пытаетесь использовать seqOp
в вашем aggregate
методе, spark должен сериализовать экземпляр VeryLargeObject
и вместе с ним все его транзитивные зависимости, а затем отправить его по сети на рабочие узлы.Этот процесс, вероятно, не завершается в течение разумного периода времени, и поэтому кажется, что все приложение заморожено.