операции pyspark не увеличиваются - PullRequest
1 голос
/ 29 мая 2020

У меня есть код в записной книжке, который работает нормально, но не работает с большими данными с бесконечными вычислениями и java .lang.OutOfMemoryError: Java пространство кучи .

Процесс выглядит следующим образом:

имитация данных pyspark

Я начинаю с фрейма данных с 3 столбцами, а именно (Пользователь, Время и Элемент), как насмешливо в коде ниже:

    from pyspark.sql.types import *
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    import pandas as pd
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)

    df_schema = StructType([ StructField("User", StringType(), True)\
                       ,StructField("Time", IntegerType(), True)\
                       ,StructField("Item", StringType(), True)])
    pddf = pd.DataFrame([["u1",1,"A"],
                    ["u1",1,"A"],
                    ["u1",2,"A"],
                    ["u1",3,"B"],
                    ["u1",3,"C"],
                    ["u1",4,"B"],
                    ["u2",1,"D"],
                    ["u2",2,"D"],
                    ["u2",2,"A"],
                    ["u2",2,"F"],
                    ["u2",3,"D"],
                    ["u2",3,"A"],],columns=["User", "Time", "Item"])

    df = spark.createDataFrame(pddf,schema=df_schema)
    df.show()

, что дает

+----+----+----+
|User|Time|Item|
+----+----+----+
|  u1|   1|   A|
|  u1|   1|   A|
|  u1|   2|   A|
|  u1|   3|   B|
|  u1|   3|   C|
|  u1|   4|   B|
|  u2|   1|   D|
|  u2|   2|   D|
|  u2|   2|   A|
|  u2|   2|   F|
|  u2|   3|   D|
|  u2|   3|   A|
+----+----+----+

промежуточный шаг

Затем я вычисляю topn наиболее распространенных элементов для каждого пользователя и создаю фрейм данных с новым столбцом u c (u c для необычных), который устанавливается в 0, если элемент в списке topn или 1.

    import pyspark.sql.functions as F
    from pyspark.sql import Window
    ArrayOfTupleType = ArrayType(StructType([
        StructField("itemId", StringType(), False),
        StructField("count", IntegerType(), False)
    ]))

    @F.udf(returnType=ArrayOfTupleType)
    def most_common(x, topn=2):
        from collections import Counter
        c = Counter(x)
        mc = c.most_common(topn)
        return mc
    topn=2
    w0 = Window.partitionBy("User")
    dfd = (df.withColumn("Item_freq", most_common(F.collect_list("Item").over(w0), F.lit(topn)))
             .select("User", "Time" , "Item" , "Item_freq")
             .withColumn("mcs", F.col("Item_freq.itemId"))
             .withColumn("uc", F.when(F.expr("array_contains(mcs, Item)"), 0).otherwise(1)).cache())

    dfd.select("User", "Time", "Item" , "mcs" , "uc").show()

, что дает промежуточный фрейм данных ниже

+----+----+----+------+---+
|User|Time|Item|mcs   |uc |
+----+----+----+------+---+
|u1  |1   |A   |[A, B]|0  |
|u1  |1   |A   |[A, B]|0  |
|u1  |2   |A   |[A, B]|0  |
|u1  |3   |B   |[A, B]|0  |
|u1  |3   |C   |[A, B]|1  |
|u1  |4   |B   |[A, B]|0  |
|u2  |1   |D   |[D, A]|0  |
|u2  |2   |D   |[D, A]|0  |
|u2  |2   |A   |[D, A]|0  |
|u2  |2   |F   |[D, A]|1  |
|u2  |3   |D   |[D, A]|0  |
|u2  |3   |A   |[D, A]|0  |
+----+----+----+------+---+

шаг агрегирования

Затем я, наконец, группирую по пользователю и времени, что является операцией, которая не удалась на реальных данных :

    uncommon = dfd.groupBy("User", "Time").agg(F.sum(F.col("uc")).alias("UncommonItem"))
    uncommon.orderBy("User", "Time", ascending=True).show()

, что дает ожидаемые результаты на фиктивных данных

+----+----+------------+
|User|Time|UncommonItem|
+----+----+------------+
|u1  |1   |0           |
|u1  |2   |0           |
|u1  |3   |1           |
|u1  |4   |0           |
|u2  |1   |0           |
|u2  |2   |1           |
|u2  |3   |0           |
+----+----+------------+

, но это не удалось с java .lang.OutOfMemoryE rror: Java пространство кучи на реальных данных.

Увеличение spark.driver.memory с 6G до 60G приводит только к тому, что cra sh появляется через гораздо большее время, пока не заполнит 60G. Мои реальные данные содержат 1907505 входных выборок

Я не очень разбираюсь в pyspark и не уверен, откуда взялась проблема. Многие другие операции groupby / agg в другом месте выполняются быстро и не дают сбоев для того же типа данных. Поэтому я подозреваю, что проблема связана с тем, как я создал свой фрейм данных dfd на промежуточном этапе выше.

Есть идеи, как оптимизировать код?

1 Ответ

1 голос
/ 29 мая 2020

Если вы согласны изменить подход, вы можете попробовать следующее:

import pyspark.sql.functions as F

topn=2
w = Window.partitionBy('User','Item')
df1 = df.withColumn("Counts",F.count('Item').over(w))

w1 = Window.partitionBy(df1["User"]).orderBy(df1['Counts'].desc())

(df1.withColumn("dummy",F.when(F.dense_rank().over(w1)<=topn,0).otherwise(1))
.groupBy('User','Time').agg(F.max("dummy").alias('UncommonItem'))).show()

+----+----+------------+
|User|Time|UncommonItem|
+----+----+------------+
|  u1|   1|           0|
|  u1|   2|           0|
|  u1|   3|           1|
|  u1|   4|           0|
|  u2|   1|           0|
|  u2|   2|           1|
|  u2|   3|           0|
+----+----+------------+

Шаги, указанные в ответе:

  1. получить счетчик в окне пользователя и элемента
  2. получить плотный_ранк вместо пользователя и счетчик, возвращенный на шаге 1
  3. везде, где ранг находится в пределах 2 (сверху), вернуть 1 иначе 0 и назвать его фиктивным
  4. группа по пользователю и времени и получить максимум манекена
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...