PySpark - карта с лямбда-функцией - PullRequest
0 голосов
/ 24 июня 2019

Я сталкиваюсь с проблемой при смешивании функций python map и lambda в среде Spark.

Учитывая df1, мой исходный фрейм данных:

Animals     | Food      | Home
----------------------------------
Monkey      | Banana    | Jungle
Dog         | Meat      | Garden
Cat         | Fish      | House
Elephant    | Banana    | Jungle
Lion        | Meat      | Desert

Я хочу создать другой фрейм данных df2,Он будет содержать два столбца со строкой на столбец df1 (3 в моем примере).Первый столбец будет содержать имя столбца df1.Второй столбец будет содержать массив элементов с наибольшим количеством вхождений (n = 3 в приведенном ниже примере) и счетчиком.

Column      | Content
-----------------------------------------------------------
Animals     | [("Cat", 1), ("Dog", 1), ("Elephant", 1)]
Food        | [("Banana", 2), ("Meat", 2), ("Fish", 1)]
Home        | [("Jungle", 2), ("Desert", 1), ("Garden", 1)]

Я пытался сделать это с помощью списков Python, функций map и lambda, но ябыли конфликты с функциями PySpark:

def transform(df1):
    # Number of entry to keep per row
    n = 3
    # Add a column for the count of occurence
    df1 = df1.withColumn("future_occurences", F.lit(1))

    df2 = df1.withColumn("Content",
        F.array(
            F.create_map(
                lambda x: (x,
                    [
                        str(row[x]) for row in df1.groupBy(x).agg(
                            F.sum("future_occurences").alias("occurences")
                        ).orderBy(
                            F.desc("occurences")
                        ).select(x).limit(n).collect()
                    ]
                ), df1.columns
            )
        )
    )
    return df2

Ошибка:

TypeError: Invalid argument, not a string or column: <function <lambda> at 0x7fc844430410> of type <type 'function'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Есть идеи, как это исправить?

Большое спасибо!

1 Ответ

2 голосов
/ 24 июня 2019

Вот одно из возможных решений, в котором столбец Content будет массивом StructType с двумя именованными полями: Content и count.

from pyspark.sql.functions import col, collect_list, desc, lit, struct
from functools import reduce 

def transform(df, n):
    return reduce(
        lambda a, b: a.unionAll(b),
        (
            df.groupBy(c).count()\
                .orderBy(desc("count"), c)\
                .limit(n)\
                .withColumn("Column", lit(c))\
                .groupBy("Column")\
                .agg(
                    collect_list(
                        struct(
                            col(c).cast("string").alias("Content"), 
                            "count")
                    ).alias("Content")
                )
            for c in df.columns
        )
    )

Эта функция будет перебирать каждый из столбцов входного кадра данных df и подсчитывать вхождение каждого значения. Затем мы orderBy считаем число (по убыванию) и значение столбца самостоятельно (в алфавитном порядке) и сохраняем только первые n строки (limit(n)).

Затем соберите значения в массив структур и, наконец, union соберите результаты для каждого столбца. Поскольку union требует, чтобы каждый DataFrame имел одну и ту же схему, вам необходимо преобразовать значение столбца в строку.

n = 3
df1 = transform(df, n)
df1.show(truncate=False)
#+-------+------------------------------------+
#|Column |Content                             |
#+-------+------------------------------------+
#|Animals|[[Cat,1], [Dog,1], [Elephant,1]]    |
#|Food   |[[Banana,2], [Meat,2], [Fish,1]]    |
#|Home   |[[Jungle,2], [Desert,1], [Garden,1]]|
#+-------+------------------------------------+

Это не точно тот же вывод, который вы просили, но, вероятно, будет достаточно для ваших нужд. (У Spark нет кортежей, как вы описали.) Вот новая схема:

df1.printSchema()
#root
# |-- Column: string (nullable = false)
# |-- Content: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- Content: string (nullable = true)
# |    |    |-- count: long (nullable = false)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...