Как хранить данные в датафреймах в Pyspark - PullRequest
0 голосов
/ 21 мая 2018

Я новичок в Pyspark и пытаюсь выяснить, как хранить данные в датафрейме.У меня есть таблица размера n x 8, где n очень большой.Допустим, df имеет x1, x2, x3, x4, x5, x6, x7, x8 столбцов.

4 из 8 столбцов остаютсяконстанта в цикле (x1, x2, x3, x4) и изменение 4 столбцов (x5, x6, x7, x8).

for i in range(1, iterations):
   df = df.alias("L").join(df_second.alias("R"), (df.x1 == df_second.x1), how="left")\
        .select("L.*", sum(col("w")*col("x7")).over(Window.partitionBy("x1")).alias("x8"))\
        .distinct()\
        .sort('x1')
   df = df.withColumn("x5", col('x6'))
   df = df.withColumn("x6", col('x5') - col("x1")*(col("x3") - col("x4")))
   df = df.withColumn("x6", when(df.sampled > 0, df.x2).otherwise(df.x6))
   df = df.withColumn("x7", 2*col('x6') - col("x5")*col("x8"))

Это работает медленнее с каждой итерацией и с n больше 50000 У меня проблемы с памятью.Я провел исследование и выяснил функции cache и persist, но не могу понять, как правильно их использовать.

У меня следующие вопросы:

  1. Должен ли я хранить постоянные и изменяющиеся столбцы в разных фреймах данных (df1 и df2 соответственно)?Если это так, я должен использовать d1.persist?
  2. Что произойдет, если я сделаю df = df.cache() и перезапишу кэшированный df с помощью операции, подобной df = df.withColumn("x5", col('x6'))?Будет ли df все еще кэшироваться или мне нужно как-то сначала очистить df из памяти, а потом сделать df = df.withColumn("x5", col('x6')).cache()?
  3. Как сделать сплит-данные для фрагментов и работать с ними, чтобы я не запускалсяНедостаточно памяти?
  4. Я пытался заставить ленивые преобразования вызывать
    df = sc.parallelize(df.collect()).toDF().cache(), что сильно замедляло программу, но каждая итерация требовала почти одинакового времени.Есть ли правильный способ сделать это?Контрольные точки сделали это еще медленнее.

1 Ответ

0 голосов
/ 23 мая 2018

@ Решение Гохта, вероятно, лучше.Но вот еще один способ форсировать вычисления и упорство.

def cache_on_parquet(df):
    TEMP = 'temp.parquet'
    df.write.parquet(TEMP, mode='overwrite')
    return sqlc.read.parquet(TEMP)

После одной или нескольких итераций

df = cache_on_parquet(df)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...