У меня есть код в записной книжке, который работает нормально, но не работает с большими данными с бесконечными вычислениями и java .lang.OutOfMemoryError: Java пространство кучи .
Процесс выглядит следующим образом:
имитация данных pyspark
Я начинаю с фрейма данных с 3 столбцами, а именно (Пользователь, Время и Элемент), как насмешливо в коде ниже:
from pyspark.sql.types import *
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import pandas as pd
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
df_schema = StructType([ StructField("User", StringType(), True)\
,StructField("Time", IntegerType(), True)\
,StructField("Item", StringType(), True)])
pddf = pd.DataFrame([["u1",1,"A"],
["u1",1,"A"],
["u1",2,"A"],
["u1",3,"B"],
["u1",3,"C"],
["u1",4,"B"],
["u2",1,"D"],
["u2",2,"D"],
["u2",2,"A"],
["u2",2,"F"],
["u2",3,"D"],
["u2",3,"A"],],columns=["User", "Time", "Item"])
df = spark.createDataFrame(pddf,schema=df_schema)
df.show()
, что дает
+----+----+----+
|User|Time|Item|
+----+----+----+
| u1| 1| A|
| u1| 1| A|
| u1| 2| A|
| u1| 3| B|
| u1| 3| C|
| u1| 4| B|
| u2| 1| D|
| u2| 2| D|
| u2| 2| A|
| u2| 2| F|
| u2| 3| D|
| u2| 3| A|
+----+----+----+
промежуточный шаг
Затем я вычисляю topn наиболее распространенных элементов для каждого пользователя и создаю фрейм данных с новым столбцом u c (u c для необычных), который устанавливается в 0, если элемент в списке topn или 1.
import pyspark.sql.functions as F
from pyspark.sql import Window
ArrayOfTupleType = ArrayType(StructType([
StructField("itemId", StringType(), False),
StructField("count", IntegerType(), False)
]))
@F.udf(returnType=ArrayOfTupleType)
def most_common(x, topn=2):
from collections import Counter
c = Counter(x)
mc = c.most_common(topn)
return mc
topn=2
w0 = Window.partitionBy("User")
dfd = (df.withColumn("Item_freq", most_common(F.collect_list("Item").over(w0), F.lit(topn)))
.select("User", "Time" , "Item" , "Item_freq")
.withColumn("mcs", F.col("Item_freq.itemId"))
.withColumn("uc", F.when(F.expr("array_contains(mcs, Item)"), 0).otherwise(1)).cache())
dfd.select("User", "Time", "Item" , "mcs" , "uc").show()
, что дает промежуточный фрейм данных ниже
+----+----+----+------+---+
|User|Time|Item|mcs |uc |
+----+----+----+------+---+
|u1 |1 |A |[A, B]|0 |
|u1 |1 |A |[A, B]|0 |
|u1 |2 |A |[A, B]|0 |
|u1 |3 |B |[A, B]|0 |
|u1 |3 |C |[A, B]|1 |
|u1 |4 |B |[A, B]|0 |
|u2 |1 |D |[D, A]|0 |
|u2 |2 |D |[D, A]|0 |
|u2 |2 |A |[D, A]|0 |
|u2 |2 |F |[D, A]|1 |
|u2 |3 |D |[D, A]|0 |
|u2 |3 |A |[D, A]|0 |
+----+----+----+------+---+
шаг агрегирования
Затем я, наконец, группирую по пользователю и времени, что является операцией, которая не удалась на реальных данных :
uncommon = dfd.groupBy("User", "Time").agg(F.sum(F.col("uc")).alias("UncommonItem"))
uncommon.orderBy("User", "Time", ascending=True).show()
, что дает ожидаемые результаты на фиктивных данных
+----+----+------------+
|User|Time|UncommonItem|
+----+----+------------+
|u1 |1 |0 |
|u1 |2 |0 |
|u1 |3 |1 |
|u1 |4 |0 |
|u2 |1 |0 |
|u2 |2 |1 |
|u2 |3 |0 |
+----+----+------------+
, но это не удалось с java .lang.OutOfMemoryE rror: Java пространство кучи на реальных данных.
Увеличение spark.driver.memory с 6G до 60G приводит только к тому, что cra sh появляется через гораздо большее время, пока не заполнит 60G. Мои реальные данные содержат 1907505 входных выборок
Я не очень разбираюсь в pyspark и не уверен, откуда взялась проблема. Многие другие операции groupby / agg в другом месте выполняются быстро и не дают сбоев для того же типа данных. Поэтому я подозреваю, что проблема связана с тем, как я создал свой фрейм данных dfd на промежуточном этапе выше.
Есть идеи, как оптимизировать код?