Искра: ленивое действие? - PullRequest
0 голосов
/ 25 июня 2018

Я работаю над сложным приложением. Из исходных данных мы вычисляем много статистики, например.

val df1 = sourceData.filter($"col1" === "val" and ...)
     .select(...)
     .groupBy(...)
     .min()

val df2 = sourceData.filter($"col2" === "val" and ...)
     .select(...)
     .groupBy(...)
     .count()

Поскольку кадры данных сгруппированы по одним и тем же столбцам, кадры данных результатов сгруппированы вместе:

df1.join(df2, Seq("groupCol"), "full_outer")
    .join(df3....) 
    .write.save(...)

(в моем коде это делается в цикле)

Это не производительно, проблема в том, что каждый фрейм данных (у меня их около 30) заканчивается действием, поэтому в моем понимании каждый фрейм данных вычисляется и возвращается драйверу, который затем отправляет данные исполнителям для выполнения соединения. .

Это дает мне ошибку памяти, я могу увеличить память драйвера, но я ищу лучший способ сделать это. Например если бы все кадры данных были вычислены только в конце (с сохранением объединенного кадра данных), я думаю, что все будет управляться кластером.

Есть ли способ сделать что-то вроде ленивого действия? Или я должен присоединиться к кадрам данных по-другому?

Thx

1 Ответ

0 голосов
/ 25 июня 2018

Прежде всего, код, который вы показали, содержит только одну операцию, похожую на действие - DataFrameWriter.save.Все остальные компоненты ленивы.

Но лень тут не особо помогает.Самая большая проблема (при условии отсутствия уродливого перекоса данных или неправильной настройки широковещания) заключается в том, что отдельные агрегации требуют отдельных перемешиваний и дорогостоящего последующего слияния.

Наивным решением было бы использовать следующее:

кадр данных сгруппирован в те же столбцы

, чтобы перетасовать в первую очередь:

val groupColumns: Seq[Column] = ???

val sourceDataPartitioned =  sourceData.groupBy(groupColumns: _*)

и использовать результат для вычисления отдельных агрегатов

val df1 = sourceDataPartitioned
  ...

val df2 = sourceDataPartitioned
  ...

Однако этоТакой подход довольно хрупок и вряд ли будет масштабироваться при наличии больших / искаженных групп.

Поэтому было бы намного лучше переписать ваш код для выполнения только агрегации.К счастью для вас, стандартное поведение SQL - это все, что вам нужно.

Давайте начнем с структурирования вашего кода в три кортежа элементов:

  • _1, являющийсяПредикат (условие, которое вы используете с filter).
  • _2 - список из Columns, для которого вы хотите вычислить агрегаты.
  • _3 - функция агрегирования.

Где пример структуры может выглядеть так:

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{count, min}

val ops: Seq[(Column, Seq[Column], Column => Column)] = Seq(
  ($"col1" === "a" and $"col2" === "b", Seq($"col3", $"col4"), count),
  ($"col2" === "b" and $"col3" === "c", Seq($"col4", $"col5"), min)
)

Теперь вы составляете агрегированные выражения, используя

agg_function(when(predicate, column)) 

pattern

import org.apache.spark.sql.functions.when

val exprs: Seq[Column] = ops.flatMap {
    case (p, cols, f) => cols.map {
      case c => f(when(p, c))
    }
}

и используйте его на sourceData

sourceData.groupBy(groupColumns: _*).agg(exprs.head, exprs.tail: _*)

Добавьте aliases при необходимости.

...