Pyspark SQL Pandas Grouped Map без GroupBy? - PullRequest
       46

Pyspark SQL Pandas Grouped Map без GroupBy?

1 голос
/ 06 ноября 2019

У меня есть набор данных, который я хочу отобразить с использованием нескольких Pyspark SQL Сгруппированных карт UDF, на разных этапах более крупного процесса ETL, который выполняется на эфемерных кластерах в AWS EMR. API Grouped Map требует, чтобы кадр данных Pyspark был сгруппирован до применения, но мне не нужно фактически группировать ключи.

В настоящее время я использую произвольную группировку, которая работает, но в результате:

  1. Ненужное перемешивание.

  2. Хакерский код для произвольной группы в каждой работе.

Мое идеальное решение позволяет применять векторизованный UDF Pandas без произвольной группировки, но если бы я мог сохранить произвольную группировку, которая по крайней мере исключила бы перемешивание.

EDIT :

Вот как выглядит мой код. Первоначально я использовал произвольную группировку, но в настоящее время пытаюсь spark_partition_id() на основе комментария ниже @ pault.


@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
  b = a_partition.drop("pid", axis=1)
  # Some other transform stuff
  return b

(sql
  .read.parquet(a_path)
  .withColumn("pid", spark_partition_id())
  .groupBy("pid")
  .apply(transform)
  .write.parquet(b_path))

Использование spark_partition_id(), кажется, все еще приводит к случайному перемешиванию. Я получаю следующий DAG:

Этап 1

  1. Сканирование паркета
  2. Проект
  3. Проект
  4. Обмен

Этап 2

  1. Обмен
  2. Сортировка
  3. FlatMapGroupsInPandas
...