Это возможно с SQL, который не самый эффективный способ (UDF будет), но он работает.И извините, что это Scala-ish.
val res = spark.sql(
"""select teamID
|, Batsman.name as `Batsman.name`, Batsman.state as `Batsman.state`
|, Bowler.name as `Bowler.name`, Bowler.state as `Bowler.state`
|, Fielder.name as `Fielder.name`, Fielder.state as `Fielder.state`
|from (
| select teamID,
| max(case type when 'Batsman' then info end) as Batsman
| , max(case type when 'Bowler' then info end) as Bowler
| , max(case type when 'Fielder' then info end) as Fielder
| from (select teamID, type, struct(name, state) as info from t_player_dtls) group by teamID
|)""".stripMargin)
Я использовал group by для поворота данных вокруг столбца teamID, и max выберет значение, которое не является нулевым, caseОператор разрешит только одну запись в max .Чтобы упростить комбинирование с максимальным регистром, я использовал функцию struct , которая создает составной столбец info , состоящий из полезной нагрузки, которую мы позже хотим поднять в плоскую схему.
UDF будетбыли более эффективными, но я не знаком с Python.
ОБНОВЛЕНИЕ Оба решения (SQL и Pivot) используют explode и groupBy combo, @Anshuman гораздо проще кодировать, имея следующие планы выполнения:
SQL
== Physical Plan ==
SortAggregate(key=[teamID#10], functions=[max(CASE WHEN (type#16 = Batsman) THEN info#31 END), max(CASE WHEN (type#16 = Bowler) THEN info#31 END), max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
+- *Sort [teamID#10 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(teamID#10, 200)
+- SortAggregate(key=[teamID#10], functions=[partial_max(CASE WHEN (type#16 = Batsman) THEN info#31 END), partial_max(CASE WHEN (type#16 = Bowler) THEN info#31 END), partial_max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
+- *Sort [teamID#10 ASC NULLS FIRST], false, 0
+- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, named_struct(name, xplayers#12.name, state, xplayers#12.address.state) AS info#31]
+- Generate explode(payload#4.players), true, false, [xplayers#12]
+- *Project [payload#4]
+- Scan ExistingRDD[payload#4,someID#5]
PIVOT
== Physical Plan ==
SortAggregate(key=[TeamID#10], functions=[first(if ((Type#16 <=> Batsman)) Name#17 else null, true), first(if ((Type#16 <=> Batsman)) State#18 else null, true), first(if ((Type#16 <=> Bowler)) Name#17 else null, true), first(if ((Type#16 <=> Bowler)) State#18 else null, true), first(if ((Type#16 <=> Fielder)) Name#17 else null, true), first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
+- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(TeamID#10, 200)
+- SortAggregate(key=[TeamID#10], functions=[partial_first(if ((Type#16 <=> Batsman)) Name#17 else null, true), partial_first(if ((Type#16 <=> Batsman)) State#18 else null, true), partial_first(if ((Type#16 <=> Bowler)) Name#17 else null, true), partial_first(if ((Type#16 <=> Bowler)) State#18 else null, true), partial_first(if ((Type#16 <=> Fielder)) Name#17 else null, true), partial_first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
+- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
+- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, xplayers#12.name AS name#17, xplayers#12.address.state AS state#18]
+- Generate explode(payload#4.players), true, false, [xplayers#12]
+- *Project [payload#4]
+- Scan ExistingRDD[payload#4,someID#5]
Оба вызывают случайное перемешивание ( Обмен хэш-разделами (TeamID # 10, 200) *).
Если ваша цель - производительность, вы могли бы использовать этот подход Scala (я не знаю Python)
import org.apache.spark.sql.functions._
val df_record = spark.read.json(Seq(row_1, row_2).toDS)
//Define your custom player types, as many as needed
val playerTypes = Seq("Batsman", "Bowler", "Fielder")
//Return type for the UDF
val returnType = StructType(playerTypes.flatMap(t => Seq(StructField(s"$t.Name", StringType), StructField(s"$t.State", StringType))))
val unpackPlayersUDF = udf( (players: Seq[Row]) => {
val playerValues: Map[String, Row] = players.map(p => (p.getAs[String]("type"), p)).toMap
val arrangedValues = playerTypes.flatMap { t =>
val playerRow = playerValues.get(t) //if type does not exist, than value will be None, which is null
Seq(
playerRow.map(_.getAs[String]("name"))
, playerRow.map(_.getAs[Row]("address").getAs[String]("state"))
)
}
Row(arrangedValues: _*)
}
, returnType)
val udfRes = df_record
.withColumn("xplayers", unpackPlayersUDF($"payload.players"))
.select("payload.teamID", "xplayers.*")
udfRes.show(false)
udfRes.explain()
Вывод:
+------+------------+-------------+-----------+------------+------------+-------------+
|teamID|Batsman.Name|Batsman.State|Bowler.Name|Bowler.State|Fielder.Name|Fielder.State|
+------+------------+-------------+-----------+------------+------------+-------------+
|1 |Amar |Gujarat |Akbar |Telangana |Antony |Kerala |
|1 |John |Queensland |Smith |Perth |null |null |
+------+------------+-------------+-----------+------------+------------+-------------+
При следующем плане выполнения:
== Physical Plan ==
*Project [payload#4.teamID AS teamID#46, UDF(payload#4.players).Batsman.Name AS Batsman.Name#40, UDF(payload#4.players).Batsman.State AS Batsman.State#41, UDF(payload#4.players).Bowler.Name AS Bowler.Name#42, UDF(payload#4.players).Bowler.State AS Bowler.State#43, UDF(payload#4.players).Fielder.Name AS Fielder.Name#44, UDF(payload#4.players).Fielder.State AS Fielder.State#45]
+- Scan ExistingRDD[payload#4,someID#5]
Перестановка не включена.Если вы хотите еще больше повысить производительность, то добавление явной схемы чтения к spark.read.schem (SCHEMA) .json поможет в дальнейшем, поскольку читателям не придется выводить схему, что экономит время.