Spark: Как преобразовать несколько строк в одну строку с несколькими столбцами? - PullRequest
0 голосов
/ 20 сентября 2018

ПРИМЕЧАНИЕ. Это только краткий пример данных.Не имеет смысла по сравнению с реальной командой по крикету.

У меня есть файл JSON, как показано ниже:

{
  "someID": "a5cf4922f4e3f45",
  "payload": {
    "teamID": "1",
    "players": [
      {
        "type": "Batsman",
        "name": "Amar",
        "address": {
          "state": "Gujarat"
        }
      },
      {
        "type": "Bowler",
        "name": "Akbar",
        "address": {
          "state": "Telangana"
        }
      },
      {
        "type": "Fielder",
        "name": "Antony",
        "address": {
          "state": "Kerala"
        }
      }
    ]
  }
}

Я взорвал это с помощью приведенного ниже кода:

df_record = spark.read.json("path-to-file.json",multiLine=True)

df_player_dtls = df_record.select("payload.teamID", explode("payload.players").alias("xplayers")) \
                          .select("teamID", \
                                  "xplayers.type", \
                                  "xplayers.name", \
                                  "xplayers.address.state")

df_player_dtls.createOrReplaceTempView("t_player_dtls")

spark.sql("SELECT * FROM t_player_dtls").show()

Таким образом, в настоящее время вывод выглядит следующим образом:

+--------+---------+--------+------------+
| TeamID |  Type   |  Name  |   State    |
+--------+---------+--------+------------+
|      1 | Batsman | Amar   | Gujarat    |
|      1 | Bowler  | Akbar  | Telangana  |
|      1 | Fielder | Antony | Kerala     |
|      2 | Batsman | John   | Queensland |
|      2 | Bowler  | Smith  | Perth      |
+--------+---------+--------+------------+

Я хочу преобразовать его в показанный ниже формат:

+--------+--------------+---------------+-------------+--------------+--------------+---------------+
| TeamID | Batsman.Name | Batsman.State | Bowler.Name | Bowler.State | Fielder.Name | Fielder.State |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+
|      1 | Amar         | Gujarat       | Akbar       | Telangana    | Antony       | Kerala        |
|      2 | John         | Queensland    | Smith       | Perth        | null         | null          |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+

Там будет только по одному игроку каждого типа в команде, и в каждой команде может быть не более четырех типов игроков ( Игрок с битой, Боулер, Филдер и Стражник ).Таким образом, максимальное количество игроков в каждой команде - четыре.Следовательно, финальная таблица, в которой будут храниться эти данные, имеет девять столбцов (по одному для идентификатора команды и имени и состояния для четырех игроков).

Возможно ли это сделать в Spark?Я новичок в Spark, и поэтому ответы, которые объясняют шаги, будут с благодарностью.

Ответы [ 2 ]

0 голосов
/ 20 сентября 2018

Мы можем использовать функцию pivot pyspark

from pyspark.sql.functions import first

df = df_player_dtls.groupBy("TeamID").pivot("Type").agg(
                            first('Name').alias('Name'),
                            first("State").alias("State"))
df.show(10,False)
0 голосов
/ 20 сентября 2018

Это возможно с SQL, который не самый эффективный способ (UDF будет), но он работает.И извините, что это Scala-ish.

val res = spark.sql(
        """select teamID
          |, Batsman.name as `Batsman.name`, Batsman.state as `Batsman.state`
          |, Bowler.name as `Bowler.name`, Bowler.state as `Bowler.state`
          |, Fielder.name as `Fielder.name`, Fielder.state as `Fielder.state`
          |from (
          |   select teamID,
          |     max(case type when 'Batsman' then info end) as Batsman
          |     , max(case type when 'Bowler' then info end) as Bowler
          |     , max(case type when 'Fielder' then info end) as Fielder
          |     from (select teamID, type, struct(name, state) as info from t_player_dtls) group by teamID
      |)""".stripMargin)

Я использовал group by для поворота данных вокруг столбца teamID, и max выберет значение, которое не является нулевым, caseОператор разрешит только одну запись в max .Чтобы упростить комбинирование с максимальным регистром, я использовал функцию struct , которая создает составной столбец info , состоящий из полезной нагрузки, которую мы позже хотим поднять в плоскую схему.

UDF будетбыли более эффективными, но я не знаком с Python.

ОБНОВЛЕНИЕ Оба решения (SQL и Pivot) используют explode и groupBy combo, @Anshuman гораздо проще кодировать, имея следующие планы выполнения:

SQL

== Physical Plan ==
SortAggregate(key=[teamID#10], functions=[max(CASE WHEN (type#16 = Batsman) THEN info#31 END), max(CASE WHEN (type#16 = Bowler) THEN info#31 END), max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
+- *Sort [teamID#10 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(teamID#10, 200)
      +- SortAggregate(key=[teamID#10], functions=[partial_max(CASE WHEN (type#16 = Batsman) THEN info#31 END), partial_max(CASE WHEN (type#16 = Bowler) THEN info#31 END), partial_max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
     +- *Sort [teamID#10 ASC NULLS FIRST], false, 0
        +- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, named_struct(name, xplayers#12.name, state, xplayers#12.address.state) AS info#31]
           +- Generate explode(payload#4.players), true, false, [xplayers#12]
              +- *Project [payload#4]
                 +- Scan ExistingRDD[payload#4,someID#5]

PIVOT

== Physical Plan ==
SortAggregate(key=[TeamID#10], functions=[first(if ((Type#16 <=> Batsman)) Name#17 else null, true), first(if ((Type#16 <=> Batsman)) State#18 else null, true), first(if ((Type#16 <=> Bowler)) Name#17 else null, true), first(if ((Type#16 <=> Bowler)) State#18 else null, true), first(if ((Type#16 <=> Fielder)) Name#17 else null, true), first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
+- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(TeamID#10, 200)
  +- SortAggregate(key=[TeamID#10], functions=[partial_first(if ((Type#16 <=> Batsman)) Name#17 else null, true), partial_first(if ((Type#16 <=> Batsman)) State#18 else null, true), partial_first(if ((Type#16 <=> Bowler)) Name#17 else null, true), partial_first(if ((Type#16 <=> Bowler)) State#18 else null, true), partial_first(if ((Type#16 <=> Fielder)) Name#17 else null, true), partial_first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
     +- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
        +- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, xplayers#12.name AS name#17, xplayers#12.address.state AS state#18]
           +- Generate explode(payload#4.players), true, false, [xplayers#12]
              +- *Project [payload#4]
                 +- Scan ExistingRDD[payload#4,someID#5]

Оба вызывают случайное перемешивание ( Обмен хэш-разделами (TeamID # 10, 200) *).

Если ваша цель - производительность, вы могли бы использовать этот подход Scala (я не знаю Python)

import org.apache.spark.sql.functions._

  val df_record = spark.read.json(Seq(row_1, row_2).toDS)

  //Define your custom player types, as many as needed
  val playerTypes = Seq("Batsman", "Bowler", "Fielder")

  //Return type for the UDF
  val returnType = StructType(playerTypes.flatMap(t => Seq(StructField(s"$t.Name", StringType), StructField(s"$t.State", StringType))))

  val unpackPlayersUDF = udf( (players: Seq[Row]) => {
    val playerValues: Map[String, Row] = players.map(p => (p.getAs[String]("type"), p)).toMap
    val arrangedValues = playerTypes.flatMap { t =>
      val playerRow = playerValues.get(t) //if type does not exist, than value will be None, which is null
      Seq(
        playerRow.map(_.getAs[String]("name"))
        , playerRow.map(_.getAs[Row]("address").getAs[String]("state"))
      )
    }
    Row(arrangedValues: _*)
  }
  , returnType)

  val udfRes = df_record
    .withColumn("xplayers", unpackPlayersUDF($"payload.players"))
    .select("payload.teamID", "xplayers.*")

  udfRes.show(false)
  udfRes.explain()

Вывод:

+------+------------+-------------+-----------+------------+------------+-------------+
|teamID|Batsman.Name|Batsman.State|Bowler.Name|Bowler.State|Fielder.Name|Fielder.State|
+------+------------+-------------+-----------+------------+------------+-------------+
|1     |Amar        |Gujarat      |Akbar      |Telangana   |Antony      |Kerala       |
|1     |John        |Queensland   |Smith      |Perth       |null        |null         |
+------+------------+-------------+-----------+------------+------------+-------------+

При следующем плане выполнения:

== Physical Plan ==
*Project [payload#4.teamID AS teamID#46, UDF(payload#4.players).Batsman.Name AS Batsman.Name#40, UDF(payload#4.players).Batsman.State AS Batsman.State#41, UDF(payload#4.players).Bowler.Name AS Bowler.Name#42, UDF(payload#4.players).Bowler.State AS Bowler.State#43, UDF(payload#4.players).Fielder.Name AS Fielder.Name#44, UDF(payload#4.players).Fielder.State AS Fielder.State#45]
+- Scan ExistingRDD[payload#4,someID#5]

Перестановка не включена.Если вы хотите еще больше повысить производительность, то добавление явной схемы чтения к spark.read.schem (SCHEMA) .json поможет в дальнейшем, поскольку читателям не придется выводить схему, что экономит время.

...