Как мне использовать стратегию стиля pandas split-apply-combin с scala api in spark? - PullRequest
0 голосов
/ 22 апреля 2020

У меня есть функция scala, которая принимает фрейм данных искры и возвращает одно значение, двойное слово. Функция сложна, использует агрегаты, определенные в классе DataFrame, вызывает другие библиотеки java и не может быть выражена в SQL. Это требует всего содержимого фрейма данных для выполнения вычислений, он не может добавлять строки за раз и наращивать до результата.

У меня есть большой фрейм данных, который содержит столбец, который я хотел бы использовать разбить информационный кадр на маленькие порции и выполнить вышеуказанные вычисления для каждого маленького порции. Затем я хотел бы вернуть новый фрейм данных, содержащий одну строку для каждой группы с двумя столбцами, один из которых содержит значение groupby, а другой - результат.

Это было бы относительно простой задачей с использованием PandasUDF, но я не могу решить как я могу сделать это в Scala.

Я попытался перераспределить фрейм данных, используя группу по столбцу, а затем вызвать mapPartitions, однако функция, передаваемая в mapPartitions, должна иметь подпись Iterator [Row] -> Iterator [ИКС]. Я могу взять Iterator [Row] и создать Seq [Row] или List [Row] достаточно просто, но представляется невозможным создать кадр данных из этого Seq, поскольку вычисления выполняются на рабочих узлах, и создание кадра данных может быть сделано только от водителя. Потребовалось бы много перепроектировать, чтобы переписать исходную функцию для получения Seq [Row], так как она использует некоторые функции агрегации высокого уровня из DataFrame (например, приблизительно Quantile).

Суть проблемы, кажется, заключается в что нет понятия «локальный (/ работник только / не распределенный) фрейм данных» в отличие от Pandas, где фреймы данных явно ограничены, чтобы быть локальными.

Я пропустил что-то очевидное?

1 Ответ

0 голосов
/ 26 апреля 2020

У меня есть большой фрейм данных, который содержит столбец, который я хотел бы использовать, чтобы разбить фрейм данных на маленькие порции и выполнить вышеуказанные вычисления для каждого маленького порция.

Значения в этот столбец известен заранее? Если нет, то они по крайней мере коллекционные? Допустим, вы можете собрать их, как:

val chunkValues: Array[Any] = df.select("chunk")
  .collect()
  .map(r => r.getAs[Any](0))

L oop по значениям, чтобы несколько раз отфильтровать inputDF и выполнить тяжелый лог c:

val chunkDFs: Array[DataFrame] = chunkValues.map(value => {
  val chunkBeforeDF = inputDF.filter(col("chunk") === value)
  val chunkAfterDF = yourLogic(chunkBefore)
})

Union их снова.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...