У меня есть набор данных, который я хочу отобразить с использованием нескольких Pyspark SQL Сгруппированных карт UDF, на разных этапах более крупного процесса ETL, который выполняется на эфемерных кластерах в AWS EMR. API Grouped Map требует, чтобы кадр данных Pyspark был сгруппирован до применения, но мне не нужно фактически группировать ключи.
В настоящее время я использую произвольную группировку, которая работает, но в результате:
Ненужное перемешивание.
Хакерский код для произвольной группы в каждой работе.
Мое идеальное решение позволяет применять векторизованный UDF Pandas без произвольной группировки, но если бы я мог сохранить произвольную группировку, которая по крайней мере исключила бы перемешивание.
EDIT :
Вот как выглядит мой код. Первоначально я использовал произвольную группировку, но в настоящее время пытаюсь spark_partition_id()
на основе комментария ниже @ pault.
@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
b = a_partition.drop("pid", axis=1)
# Some other transform stuff
return b
(sql
.read.parquet(a_path)
.withColumn("pid", spark_partition_id())
.groupBy("pid")
.apply(transform)
.write.parquet(b_path))
Использование spark_partition_id()
, кажется, все еще приводит к случайному перемешиванию. Я получаю следующий DAG:
Этап 1
- Сканирование паркета
- Проект
- Проект
- Обмен
Этап 2
- Обмен
- Сортировка
- FlatMapGroupsInPandas