Я новичок в Pyspark и пытаюсь выяснить, как хранить данные в датафрейме.У меня есть таблица размера n x 8
, где n
очень большой.Допустим, df
имеет x1
, x2
, x3
, x4
, x5
, x6
, x7
, x8
столбцов.
4 из 8 столбцов остаютсяконстанта в цикле (x1
, x2
, x3
, x4
) и изменение 4 столбцов (x5
, x6
, x7
, x8
).
for i in range(1, iterations):
df = df.alias("L").join(df_second.alias("R"), (df.x1 == df_second.x1), how="left")\
.select("L.*", sum(col("w")*col("x7")).over(Window.partitionBy("x1")).alias("x8"))\
.distinct()\
.sort('x1')
df = df.withColumn("x5", col('x6'))
df = df.withColumn("x6", col('x5') - col("x1")*(col("x3") - col("x4")))
df = df.withColumn("x6", when(df.sampled > 0, df.x2).otherwise(df.x6))
df = df.withColumn("x7", 2*col('x6') - col("x5")*col("x8"))
Это работает медленнее с каждой итерацией и с n больше 50000
У меня проблемы с памятью.Я провел исследование и выяснил функции cache
и persist
, но не могу понять, как правильно их использовать.
У меня следующие вопросы:
- Должен ли я хранить постоянные и изменяющиеся столбцы в разных фреймах данных (
df1
и df2
соответственно)?Если это так, я должен использовать d1.persist
? - Что произойдет, если я сделаю
df = df.cache()
и перезапишу кэшированный df
с помощью операции, подобной df = df.withColumn("x5", col('x6'))
?Будет ли df
все еще кэшироваться или мне нужно как-то сначала очистить df
из памяти, а потом сделать df = df.withColumn("x5", col('x6')).cache()
? - Как сделать сплит-данные для фрагментов и работать с ними, чтобы я не запускалсяНедостаточно памяти?
- Я пытался заставить ленивые преобразования вызывать
df = sc.parallelize(df.collect()).toDF().cache()
, что сильно замедляло программу, но каждая итерация требовала почти одинакового времени.Есть ли правильный способ сделать это?Контрольные точки сделали это еще медленнее.