При условии, что ваша таблица имеет целочисленный ключ / индекс, вы можете использовать цикл + запрос для чтения кусков большого фрейма данных.
Я держусь подальше от df.toPandas()
, который несет много накладных расходов.Вместо этого у меня есть вспомогательная функция, которая преобразует результаты запроса pyspark
, который представляет собой список Row
экземпляров, в pandas.DataFrame
.
In [1]: from pyspark.sql.functions import col
In [2]: from pyspark.sql import SparkSession
In [3]: import numpy as np
In [4]: import pandas as pd
In [5]: def to_pandas(rows):
: row_dicts = [r.asDict() for r in rows]
: return pd.DataFrame.from_dict(row_dicts)
:
Чтобы увидеть эту функцию в действии,давайте создадим небольшой пример кадра данных.
In [6]: from string import ascii_letters
: n = len(ascii_letters)
: df = pd.DataFrame({'id': range(n),
: 'num': np.random.normal(10,1,n),
: 'txt': list(ascii_letters)})
: df.head()
Out [7]:
id num txt
0 0 9.712229 a
1 1 10.281259 b
2 2 8.342029 c
3 3 11.115702 d
4 4 11.306763 e
In [ 8]: spark = SparkSession.builder.appName('Ops').getOrCreate()
: df_spark = spark.createDataFrame(df)
: df_spark
Out[ 9]: DataFrame[id: bigint, num: double, txt: string]
Чанки собираются путем фильтрации по индексу.
In [10]: chunksize = 25
: for i in range(0, n, chunksize):
: chunk = (df_spark.
: where(col('id').between(i, i + chunksize)).
: collect())
: pd_df = to_pandas(chunk)
: print(pd_df.num.mean())
:
9.779573360741152
10.23157424753804
9.550750629366462