Pandas to PySpark: преобразование столбца списков кортежей в отдельные столбцы для каждого элемента кортежа - PullRequest
0 голосов
/ 09 сентября 2018

Мне нужно преобразовать DataFrame, в котором один из столбцов состоит из списка кортежей, каждый элемент в каждом из кортежей должен быть отдельным столбцом.

Вот пример и решение вПанды:

import pandas as pd

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame.from_dict(df_dict)
print(df)  # intial structure

           a    d
    1   stuff   [(1, 2), (3, 4)]
    2   stuff2  [(1, 2), (3, 4)]

# first transformation, let's separate each list item into a new row
row_breakdown = df.set_index(["a"])["d"].apply(pd.Series).stack()
print(row_breakdown)

            a        
    stuff   0    (1, 2)
            1    (3, 4)
    stuff2  0    (1, 2)
            1    (3, 4)
    dtype: object

row_breakdown = row_breakdown.reset_index().drop(columns=["level_1"])
print(row_breakdown)

    a   0
    0   stuff   (1, 2)
    1   stuff   (3, 4)
    2   stuff2  (1, 2)
    3   stuff2  (3, 4)

# second transformation, let's get each tuple item into a separate column
row_breakdown.columns = ["a", "d"]
row_breakdown = row_breakdown["d"].apply(pd.Series)
row_breakdown.columns = ["value_1", "value_2"]
print(row_breakdown)

        value_1 value_2
    0   1   2
    1   3   4
    2   1   2
    3   3   4

Это решение для панд.Я должен быть в состоянии сделать то же самое, но с помощью PySpark (2.3).Я начал работать над этим, но сразу застрял:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)

row_breakdown = ddf.set_index(["a"])["d"].apply(pd.Series).stack()

    AttributeError: 'DataFrame' object has no attribute 'set_index'

Судя по всему, Spark не поддерживает индексацию.Любые указатели оценены.

Ответы [ 2 ]

0 голосов
/ 10 сентября 2018

Обновление

Если вы начинаете с DataFrame со следующей схемой:

ddf.printSchema()
#root
# |-- a: string (nullable = true)
# |-- d: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- _1: long (nullable = true)
# |    |    |-- _2: long (nullable = true)

Вы должны использовать pyspark.sql.functions.explode, чтобы разбить массив встолбцы, но после этого вы можете использовать селектор *, чтобы превратить структуру в столбцы:

from pyspark.sql.functions import explode

row_breakdown = ddf.select("a", explode("d").alias("d")).select("a", "d.*")
row_breakdown.show()
#+------+---+---+
#|     a| _1| _2|
#+------+---+---+
#| stuff|  1|  2|
#| stuff|  3|  4|
#|stuff2|  1|  2|
#|stuff2|  3|  4|
#+------+---+---+

А чтобы переименовать столбцы, вы можете использовать понимание списка с помощью str.replace:

from pyspark.sql.functions import col

row_breakdown = row_breakdown.select(
    *[col(c).alias(c.replace("_", "value")) for c in row_breakdown.columns]
)
row_breakdown.show()
#+------+------+------+
#|     a|value1|value2|
#+------+------+------+
#| stuff|     1|     2|
#| stuff|     3|     4|
#|stuff2|     1|     2|
#|stuff2|     3|     4|
#+------+------+------+

Оригинальный ответ

Если вы начинаете со словаря, вам совсем не нужно использовать pandas для этого.

Вместо этого вы можете создать свой DataFrame прямо из своего словаря.Ключ должен преобразовать ваш словарь в соответствующий формат , а затем использовать его для построения вашего Spark DataFrame.

В вашем примере кажется, что вы не используете значения под a ключ на всех.

Как я упомянул в моем комментарии , вы можете получить описанный результат с помощью следующего кода:

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

from itertools import chain
row_breakdown = spark.createDataFrame(
    chain.from_iterable(df_dict["d"].values()), ["value1", "value2"]
)
row_breakdown.show()
#+------+------+
#|value1|value2|
#+------+------+
#|     1|     2|
#|     3|     4|
#|     1|     2|
#|     3|     4|
#+------+------+

Если вы хотите столбец, похожий на индекс, вы можетедостигните этого, просто используя enumerate, как в следующем примере.Здесь я также сортирую значения по ключу, поскольку это, по-видимому, является вашим намерением.

data = (
    (i,) + v for i, v in enumerate(
        chain.from_iterable(
            v for k, v in sorted(df_dict["d"].items(), key=lambda (key, val): key)
        )
    )
)
columns = ["index", "value1", "value2"]
row_breakdown = spark.createDataFrame(data, columns)
row_breakdown.show()
#+-----+------+------+
#|index|value1|value2|
#+-----+------+------+
#|    0|     1|     2|
#|    1|     3|     4|
#|    2|     1|     2|
#|    3|     3|     4|
#+-----+------+------+

Как вы можете видеть здесь, мы можем передать выражение генератора в spark.createDataFrame, и это решение нетребует от нас заранее знать длину кортежей.

0 голосов
/ 10 сентября 2018

Это может сделать:

from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
import pandas as pd

conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

df_dict = {
    'a': {
        "1": "stuff", "2": "stuff2"
    }, 

    "d": {
        "1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
    }
}

df = pd.DataFrame(df_dict)
ddf = spark.createDataFrame(df)


exploded = ddf.withColumn('d', F.explode("d"))
exploded.show()

Результат:

+------+------+
|     a|     d|
+------+------+
| stuff|[1, 2]|
| stuff|[3, 4]|
|stuff2|[1, 2]|
|stuff2|[3, 4]|
+------+------+

Мне удобнее использовать SQL для этого:

exploded.createOrReplaceTempView("exploded")
spark.sql("SELECT a, d._1 as value_1, d._2 as value_2 FROM exploded").show()

Важное замечание: причина использования аксессоров _1 и _2 заключается в том, что spark проанализировал кортеж как структуру и дал ему ключи по умолчанию. Если в вашей реальной реализации фрейм данных содержит array<int>, вы должны использовать синтаксис [0].

Окончательный результат:

+------+-------+-------+
|     a|value_1|value_2|
+------+-------+-------+
| stuff|      1|      2|
| stuff|      3|      4|
|stuff2|      1|      2|
|stuff2|      3|      4|
+------+-------+-------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...