У меня есть этот Dataframe
+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|_id |details__line_items |searchable_tags|
+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|307131663|[[line_item_1345678912345678M, {}, {},, loan, 1,, 116000], [line_item_123456789, {}, {},, Test, 1,, 1234567], [line_item_2kZgNnPXvEgnKCAaM, {}, {},, loan, 1,, 1234]]|[] |
|040013496|[[line_item_1345678912345678M, {}, {},, loan, 1,, 116000], [line_item_123456789, {}, {},, Test, 1,, 1234567], [line_item_2kZgNnPXvEgnKCAaM, {}, {},, loan, 1,, 1234]]|[] |
+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
Я взрываю столбец details__line_items с помощью этой функции:
def getArrayDataFrame(df: DataFrame): ListBuffer[DataFrame] = {
df.schema
.filter(field => {
field.dataType.typeName == "array"
})
.map(field => {
val explodeColumn = (colsName: String) =>
df.withColumn("items", explode(df.col(s"${field.name}")))
.select("_id", colsName)
field.dataType match {
case arrayType: ArrayType => {
arrayType.elementType.typeName match {
case "struct" => explodeColumn("items.*")
case _ => explodeColumn(s"${field.name}")
}
}
}
})
.to[ListBuffer]
}
Я получаю этот Dataframe:
+---------+---------------------------+--------------+---------------+-----------+----+--------+----+----------+
|_id |_id |antifraud_info|contextual_data|description|name|quantity|sku |unit_price|
+---------+---------------------------+--------------+---------------+-----------+----+--------+----+----------+
|307131663|line_item_1345678912345678M|{} |{} |null |loan|1 |null|116000 |
|307131663|line_item_123456789 |{} |{} |null |Test|1 |null|1234567 |
|307131663|line_item_2kZgNnPXvEgnKCAaM|{} |{} |null |loan|1 |null|1234 |
|040013496|line_item_1345678912345678M|{} |{} |null |loan|1 |null|116000 |
|040013496|line_item_123456789 |{} |{} |null |Test|1 |null|1234567 |
|040013496|line_item_2kZgNnPXvEgnKCAaM|{} |{} |null |loan|1 |null|1234 |
+---------+---------------------------+--------------+---------------+-----------+----+--------+----+----------+
Как я могу получить новый Dataframe, подобный этому?
+---------+---+---------------------------+-------------------+--------------------+----------------+---------+-------------+--------+---------------+
|_id |index|_id |antifraud_info|contextual_data|description|name|quantity|sku|unit_price|
+---------+---+---------------------------+-------------------+--------------------+----------------+---------+-------------+--------+---------------+
|307131663|0 |line_item_1345678912345678M|{} |{} |null |loan |1 |null |116000 |
|307131663|1 |line_item_123456789 |{} |{} |null |Test |1 |null |1234567 |
|307131663|2 |line_item_2kZgNnPXvEgnKCAaM|{} |{} |null |loan |1 |null |1234 |
|040013496|0 |line_item_1345678912345678M|{} |{} |null |loan |1 |null |116000 |
|040013496|1 |line_item_123456789 |{} |{} |null |Test |1 |null |1234567 |
|040013496|2 |line_item_2kZgNnPXvEgnKCAaM|{} |{} |null |loan |1 |null |1234 |
+---------+---+---------------------------+-------------------+--------------------+----------------+---------+-------------+--------+---------------+
Я уже пытался использовать posexplode
, но он меняет мою схему данных, добавляя столбцы col и pos, я изменил свою функцию следующим образом.
def getArrayDataFrame(df: DataFrame): ListBuffer[DataFrame] = {
df.schema
.filter(field => {
field.dataType.typeName == "array"
})
.map{ (field) => {
println(s"This is the name of the field ${field.name}")
val testDF = df.select($"_id", posexplode(df.col(s"${field.name}") ))
testDF.printSchema()
val newDF = testDF.select(flattenSchema(testDF.schema): _*)
newDF.printSchema()
newDF
}}
.to[ListBuffer]
}
Итак, как я могу получить индекс разнесенного столбца, не изменяя мою схему Dataframe?