Дает ли flatmap лучшую производительность, чем filter + map? - PullRequest
4 голосов
/ 25 июня 2019

У меня довольно большой набор данных (100 миллионов + записей с сотнями столбцов), которые я обрабатываю с помощью spark. Я считываю данные в набор искровых данных и хочу отфильтровать этот набор данных и сопоставить подмножество его полей с классом наблюдений.

код выглядит несколько похожим,

case class Subset(name:String,age:Int)
case class Complete(name:String,field1:String,field2....,age:Int)

val ds = spark.read.format("csv").load("data.csv").as[Complete]

#approach 1
ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))

#approach 2
ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)

Какой подход лучше? Любые дополнительные советы о том, как я могу сделать этот код более производительным?

Спасибо!

Редактировать

Я провел несколько тестов для сравнения времени выполнения, и похоже, что подход 2 довольно быстрый, код, который я использовал для получения времени выполнения, выглядит следующим образом:

val subset = spark.time {
   ds.filter(x=>x.age>25).map(x=> Subset(x.name,x.age))
}

spark.time {
   subset.count()
}

and 

val subset2 = spark.time {
   ds.flatMap(x=>if(x.age>25) Seq(Subset(x.name,x.age)) else Seq.empty)
}

spark.time {
   subset2.count()
}

Ответы [ 2 ]

3 голосов
/ 26 июня 2019

Обновление : Мой оригинальный ответ содержал ошибку: Spark поддерживает Seq как результат flatMap (и преобразует результат обратно в Dataset),Извиняюсь за путаницу.Я также добавил дополнительную информацию об улучшении производительности вашего анализа.

Обновление 2 : я пропустил, что вы используете Dataset вместо RDD (doh!).Это не оказывает существенного влияния на ответ.

Spark - это распределенная система, которая распределяет данные по нескольким узлам и обрабатывает данные параллельно.С точки зрения эффективности, действия, которые приводят к перераспределению (требующему передачи данных между узлами), намного дороже с точки зрения времени выполнения, чем изменения на месте.Также следует отметить, что операции, которые просто преобразуют данные , такие как filter, map, flatMap и т. Д., Просто сохраняются и не выполняются до действия выполняется операция (например, reduce, fold, aggregate и т. д.).Следовательно, ни одна альтернатива на самом деле ничего не делает в том виде, в каком она существует.

Когда действие выполняется в результате этих преобразований, я ожидаю, что операция filter будет гораздо более эффективной: она обрабатывает только данные (используяпоследующая операция map, которая передает предикат x=>x.age>25 (более типично записываемый как _.age > 25).Хотя может показаться, что filter создает промежуточную коллекцию, она выполняется лениво.В результате Spark объединяет операции filter и map.

Ваша операция flatMap, откровенно говоря, отвратительна.Это форсирует обработку, создание последовательности и последующее выравнивание каждого элемента данных, что, безусловно, увеличит общую обработку.

Тем не менее, лучший способ улучшить производительность вашего анализа - это управление секционированием, чтобы данные былиразделить примерно равномерно по максимально возможному количеству узлов.См. это руководство как хорошую отправную точку.

0 голосов
/ 26 июня 2019

Судя по логике синтаксиса, первый подход должен использовать меньше места, так как flatMap распространяется на .map (). Flatten, оба аргумента равного размера. Он компилируется в тот же Java-байт-код в Scala REPL (правка: при использовании примера с домашним животным, который явно не компенсирует фактическое тестирование его со сравнительно большими данными).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...