чанки топанды из спаркфрейма - PullRequest
0 голосов
/ 25 октября 2018

У меня есть искровой фрейм с 10 миллионами записей и 150 столбцами.Я пытаюсь преобразовать его в панду DF.

x = df.toPandas()
# do some things to x

И он не работает с ordinal must be >= 1.Я предполагаю, что это потому, что он слишком большой, чтобы справиться сразу.Можно ли разделить его на части и преобразовать в DF для панд для каждого фрагмента?

Полный стек:

ValueError                                Traceback (most recent call last)
<command-2054265283599157> in <module>()
    158 from db.table where snapshot_year_month=201806""")
--> 159 ps = x.toPandas()
    160 # ps[["pol_nbr",
    161 # "pol_eff_dt",

/databricks/spark/python/pyspark/sql/dataframe.py in toPandas(self)
   2029                 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
   2030         else:
-> 2031             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   2032 
   2033             dtype = {}

/databricks/spark/python/pyspark/sql/dataframe.py in collect(self)
    480         with SCCallSiteSync(self._sc) as css:
    481             port = self._jdf.collectToPython()
--> 482         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
    483

1 Ответ

0 голосов
/ 07 февраля 2019

При условии, что ваша таблица имеет целочисленный ключ / индекс, вы можете использовать цикл + запрос для чтения кусков большого фрейма данных.

Я держусь подальше от df.toPandas(), который несет много накладных расходов.Вместо этого у меня есть вспомогательная функция, которая преобразует результаты запроса pyspark, который представляет собой список Row экземпляров, в pandas.DataFrame.

In [1]: from pyspark.sql.functions import col

In [2]: from pyspark.sql import SparkSession

In [3]: import numpy as np

In [4]: import pandas as pd

In [5]: def to_pandas(rows):
       :     row_dicts = [r.asDict() for r in rows]
       :     return pd.DataFrame.from_dict(row_dicts)
       :

Чтобы увидеть эту функцию в действии,давайте создадим небольшой пример кадра данных.

In [6]: from string import ascii_letters
       : n = len(ascii_letters)
       : df = pd.DataFrame({'id': range(n),
       :                    'num': np.random.normal(10,1,n),
       :                    'txt': list(ascii_letters)})
       : df.head()
Out [7]:
   id        num txt
0   0   9.712229   a
1   1  10.281259   b
2   2   8.342029   c
3   3  11.115702   d
4   4  11.306763   e


In [ 8]: spark = SparkSession.builder.appName('Ops').getOrCreate()
       : df_spark = spark.createDataFrame(df)
       : df_spark
Out[ 9]: DataFrame[id: bigint, num: double, txt: string]

Чанки собираются путем фильтрации по индексу.

In [10]: chunksize = 25
       : for i in range(0, n, chunksize):
       :     chunk = (df_spark.
       :               where(col('id').between(i, i + chunksize)).
       :               collect())
       :     pd_df = to_pandas(chunk)
       :     print(pd_df.num.mean())
       :
9.779573360741152
10.23157424753804
9.550750629366462
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...