Искра позволяет разбить датафрейм на части для топанд - PullRequest
0 голосов
/ 26 октября 2018

У меня есть 10 миллионов записей данных. Мое требование - мне нужно выполнить некоторые операции с этими данными в пандах, и у меня нет памяти для того, чтобы все 10 миллионов записей находились в пандах одновременно. Поэтому я хочу иметь возможность разбивать его на части и использовать toPandas на каждом чанке

df = sqlContext.sql("select * from db.table")
#do chunking to take X records at a time
#how do I generated chunked_df?
p_df = chunked_df.toPandas()
#do things to p_df

Как я могу разделить свой фрейм данных на равные x-части или на части по количеству записей, скажем, 1 миллион за один раз. Любое решение приемлемо, мне просто нужно обрабатывать его небольшими порциями.

1 Ответ

0 голосов
/ 26 октября 2018

Один из вариантов - использовать toLocalIterator в сочетании с repartition и mapPartitions.

import pandas as pd

columns = spark_df.schema.fieldNames()
chunks = spark_df.repartition(num_chunks).rdd.mapPartitions(lambda iterator: [pd.DataFrame(list(iterator), columns=columns)]).toLocalIterator()
for pdf in chunks:
    # do work locally on chunk as pandas df

При использовании toLocalIterator только один раздел за раз собирается для драйвера.

Другой вариант, который, на мой взгляд, предпочтительнее, состоит в том, чтобы распределить вашу работу по кластеру по блокам панд в каждом разделе. Это может быть достигнуто с помощью pandas_udf:

from pyspark.sql.functions import spark_partition_id, pandas_udf, PandasUDFType

@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def transform_pandas_df_chunk(pdf):
    result_pdf = ...
    # do ditributed work on a chunk of the original spark dataframe as a pandas dataframe
    return result_pdf

spark_df = spark_df.repartition(num_chunks).groupby(spark_partition_id()).apply(transform_pandas_df_chunk)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...