Ускоряется ли Pandas (), когда размер фрейма данных pyspark уменьшается? - PullRequest
0 голосов
/ 21 января 2020

Я подумал, что задам вопрос. Я нашел умный способ уменьшить размер PySpark Dataframe и преобразовать его в Pandas, и мне было просто интересно, становится ли функция to Pandas быстрее, когда размер кадра pyspark становится меньше? Вот некоторый код:

window = Window.partitionBy(F.lit('A')).orderBy(F.lit('A'))

eps_tfs = {}
while True:
    pdf = toPandas(conn.select(F.col('*')).where(F.col('row_number') <= 2500))
    n = len(pdf)
    trigger = 0
    for u in pdf['features']:
        indices = [i for i, x in enumerate(u) if x == 1.0]
        for idx in range(len(eps_columns)):
            if idx in indices:
                try:
                    eps_tfs[eps_columns[idx]].append(True)
                except:
                    eps_tfs[eps_columns[idx]] = [True]
            else:
                try:
                    eps_tfs[eps_columns[idx]].append(False)
                except:
                    eps_tfs[eps_columns[idx]] = [False]
    full_view = full_view.append(pd.concat([pdf, pd.DataFrame(eps_tfs)], axis=1))
    conn = conn.select(F.col('*')).where(F.col('row_number') > 2500)
    conn = conn.drop("row_number")
    conn = conn.select(F.col('*'), F.row_number().over(window).alias('row_number'))
    eps_tfs = {}
    del pdf
    if n < 2500:
        break

Кроме того, является ли следующий код действительно более быстрым способом сопоставления фрейма данных с pandas?

def _map_to_pandas(rdds):
    """ Needs to be here due to pickling issues """
    return [pd.DataFrame(list(rdds))]

def toPandas(df, n_partitions=None):
    """
    Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
    repartitioned if `n_partitions` is passed.
    :param df:              pyspark.sql.DataFrame
    :param n_partitions:    int or None
    :return:                pandas.DataFrame
    """
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

Есть ли лучший способ для go об этом?

1 Ответ

1 голос
/ 22 января 2020

здесь - это исходный код To Pandas,

И, прежде всего, да, до Pandas будет быстрее, если ваш фрейм данных pyspark становится меньше, он имеет аналогичный на вкус как sdf.collect ()
Разница в том, чтобы Pandas вернуть pdf и собрать возвращать список.
Как видно из исходного кода pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) pdf генерируется из pd.DataFrame.from_records from Список!

Так что, если ваш sdf меньше, там будет меньше данных для передачи по сети, и from_record обрабатывает меньше данных, используя ЦП вашего драйвера.

Дизайн второго код другой, sdf распространяется, код вызывает Mappartition, поэтому все работники генерируют Pandas фрейм данных из подмножества данных, затем он вызывает метод сбора, теперь весь Pandas фрейм данных, передаваемый по сети, передается в драйвер , Затем код вызывает pd.concat для объединения всех данных.

Преимущества:

  1. При преобразовании в Pandas DataFrame все работники работают в небольшом подмножестве параллельные данные гораздо лучше, чем выводить все данные в драйвер и сжигать процессор вашего драйвера, чтобы преобразовать гигантские данные в Pandas.
  2. Идет перераспределение, то есть, если ваш набор данных огромен, и у вас есть Небольшое количество разделов, данные на каждом разделе будут огромными, и до Pandas будет сбой на OOM сериализатора, а также очень медленный сбор данных

Недостатки:

  1. теперь, когда вы собираете, вы не собираете собственные данные sdf, вместо pandas кадра данных, к которому прикреплено больше метаданных и обычно больше, что означает, что общий размер объекта больше
  2. pd.concat медленный лол, но все же может быть лучше, чем from_record

Так что нет универсального заключения о том, какой метод лучше, но выбирайте мудро, какой инструмент использовать. Как и в этом вопросе, Pandas может быть быстрее, чем маленький sdf, но для больших sdf фрагмент кода определенно работает лучше.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...