Один из вариантов - использовать toLocalIterator
в сочетании с repartition
и mapPartitions
.
import pandas as pd
columns = spark_df.schema.fieldNames()
chunks = spark_df.repartition(num_chunks).rdd.mapPartitions(lambda iterator: [pd.DataFrame(list(iterator), columns=columns)]).toLocalIterator()
for pdf in chunks:
# do work locally on chunk as pandas df
При использовании toLocalIterator
только один раздел за раз собирается для драйвера.
Другой вариант, который, на мой взгляд, предпочтительнее, состоит в том, чтобы распределить вашу работу по кластеру по блокам панд в каждом разделе. Это может быть достигнуто с помощью pandas_udf
:
from pyspark.sql.functions import spark_partition_id, pandas_udf, PandasUDFType
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def transform_pandas_df_chunk(pdf):
result_pdf = ...
# do ditributed work on a chunk of the original spark dataframe as a pandas dataframe
return result_pdf
spark_df = spark_df.repartition(num_chunks).groupby(spark_partition_id()).apply(transform_pandas_df_chunk)