Обновление
Если вы начинаете с DataFrame со следующей схемой:
ddf.printSchema()
#root
# |-- a: string (nullable = true)
# |-- d: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- _1: long (nullable = true)
# | | |-- _2: long (nullable = true)
Вы должны использовать pyspark.sql.functions.explode
, чтобы разбить массив встолбцы, но после этого вы можете использовать селектор *
, чтобы превратить структуру в столбцы:
from pyspark.sql.functions import explode
row_breakdown = ddf.select("a", explode("d").alias("d")).select("a", "d.*")
row_breakdown.show()
#+------+---+---+
#| a| _1| _2|
#+------+---+---+
#| stuff| 1| 2|
#| stuff| 3| 4|
#|stuff2| 1| 2|
#|stuff2| 3| 4|
#+------+---+---+
А чтобы переименовать столбцы, вы можете использовать понимание списка с помощью str.replace
:
from pyspark.sql.functions import col
row_breakdown = row_breakdown.select(
*[col(c).alias(c.replace("_", "value")) for c in row_breakdown.columns]
)
row_breakdown.show()
#+------+------+------+
#| a|value1|value2|
#+------+------+------+
#| stuff| 1| 2|
#| stuff| 3| 4|
#|stuff2| 1| 2|
#|stuff2| 3| 4|
#+------+------+------+
Оригинальный ответ
Если вы начинаете со словаря, вам совсем не нужно использовать pandas
для этого.
Вместо этого вы можете создать свой DataFrame прямо из своего словаря.Ключ должен преобразовать ваш словарь в соответствующий формат , а затем использовать его для построения вашего Spark DataFrame.
В вашем примере кажется, что вы не используете значения под a
ключ на всех.
Как я упомянул в моем комментарии , вы можете получить описанный результат с помощью следующего кода:
df_dict = {
'a': {
"1": "stuff", "2": "stuff2"
},
"d": {
"1": [(1, 2), (3, 4)], "2": [(1, 2), (3, 4)]
}
}
from itertools import chain
row_breakdown = spark.createDataFrame(
chain.from_iterable(df_dict["d"].values()), ["value1", "value2"]
)
row_breakdown.show()
#+------+------+
#|value1|value2|
#+------+------+
#| 1| 2|
#| 3| 4|
#| 1| 2|
#| 3| 4|
#+------+------+
Если вы хотите столбец, похожий на индекс, вы можетедостигните этого, просто используя enumerate
, как в следующем примере.Здесь я также сортирую значения по ключу, поскольку это, по-видимому, является вашим намерением.
data = (
(i,) + v for i, v in enumerate(
chain.from_iterable(
v for k, v in sorted(df_dict["d"].items(), key=lambda (key, val): key)
)
)
)
columns = ["index", "value1", "value2"]
row_breakdown = spark.createDataFrame(data, columns)
row_breakdown.show()
#+-----+------+------+
#|index|value1|value2|
#+-----+------+------+
#| 0| 1| 2|
#| 1| 3| 4|
#| 2| 1| 2|
#| 3| 3| 4|
#+-----+------+------+
Как вы можете видеть здесь, мы можем передать выражение генератора в spark.createDataFrame
, и это решение нетребует от нас заранее знать длину кортежей.