Агрегат Scala Spark RDD ведет себя странно - PullRequest
0 голосов
/ 24 апреля 2019

У меня есть 2 seqOp функции, переданные aggregate, которые я ожидаю получить идентичные результаты. Они не.

Эта версия работает:

rdd.aggregate(0)((acc, article) => (acc + (if (article.mentionsLanguage(lang)) 1 else 0)), _ + _)

Эта версия не работает:

def seqOp(acc: Int, article: WikipediaArticle): Int = {
  acc + (if (article.mentionsLanguage(lang)) 1 else 0)
}

rdd.aggregate(0)(seqOp, _ + _)

По какой-то причине последняя версия зависает, ничего не делая, не потребляет ЦП и не выдает ошибок. По жизни я не вижу, как эти функции отличаются. Я что-то неправильно понимаю в лямбда-синтаксисе?

Ответы [ 2 ]

2 голосов
/ 24 апреля 2019

Я думаю, seqOp - это не вложенная функция, а метод, привязанный к какому-то огромному объекту.Возможно, вы на самом деле пытаетесь отправить (acc, article) => this.seqOp(acc, article) на рабочие узлы, где this - это тяжелый объект, связанный с еще более тяжелым графом объектов, который находится на вашей главной JVM.Это заставляет ваш главный узел пытаться сериализовать все вещи, связанные с объектом, для которого определен метод, и со стороны это выглядит так, как будто вычисления даже не запускаются должным образом, потому что мастеру никогда не удается отправить весь объектГраф для рабочих узлов.

Когда вы используете синтаксис анонимной функции, он десагарсует примерно так:

rdd.aggregate(0)(
  new Function2[Int, WikipediaArticle, Int] {
    def apply(acc: Int, article: WikipediaArticle) = 
      (acc + (if (article.mentionsLanguage(lang)) 1 else 0))
  }, 
  _ + _
)

Здесь вы можете сразу увидеть, что экземпляр анонимного локального классарасширение от Function2 не имеет ссылок на какие-либо другие объекты.На самом деле, он даже не имеет переменных-членов, поэтому на самом деле нечего сериализовать (все, что вам нужно знать, это класс этой вещи; она не несет с собой никакой дополнительной информации).

Но когда вы определяете метод seqOp для некоторых VeryLargeObject

class VeryLargeObject {
  val referencesToMillionOtherObjects: Array[Any]
  def seqOp(acc: Int, article: WikipediaArticle) = ...
}

, а затем пытаетесь использовать seqOp в вашем aggregate методе, spark должен сериализовать экземпляр VeryLargeObjectи вместе с ним все его транзитивные зависимости, а затем отправить его по сети на рабочие узлы.Этот процесс, вероятно, не завершается в течение разумного периода времени, и поэтому кажется, что все приложение заморожено.

1 голос
/ 24 апреля 2019

Метод RDD aggregate ожидает двоичный оператор function в качестве параметра seqOp:

def aggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

То, что вы определили ниже, это метод (а не функция):

def seqOp(acc: Int, article: WikipediaArticle): Int = {
  acc + (if (article.mentionsLanguage(lang)) 1 else 0)
}

Вот как бы вы определили seqOp как функцию:

val seqOp = (acc: Int, article: WikipediaArticle) => {
  acc + (if (article.mentionsLanguage(lang)) 1 else 0)
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...