Как добавить больше столбцов в структурные данные в scala - PullRequest
1 голос
/ 28 февраля 2020

У меня есть два кадра данных (A и B), A - структурная схема, тогда как B - общая схема, как показано ниже, и я добавлю столбцы B в A для C

A:

root
|-- package: struct (nullable = true)
|    |-- globalPackageId: long (nullable = true)
|    |-- naPackageId: string (nullable = true)
|    |-- packageName: string (nullable = true)
|-- supplies: struct (nullable = true)
|    |-- supplyMask: integer (nullable = true)
|    |-- supplyIds: array (nullable = true)
|    |    |-- element: integer (containsNull = true)
|-- timestampDetails: struct (nullable = true)
|    |-- packageTimestamp: string (nullable = true)
|    |-- onboardTimestamp: string (nullable = true)

B:

root
 |-- globalPackageId: long (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_address: string (nullable = true)
 |-- order_number: integer (nullable = true)

C:

root
|-- package: struct (nullable = true)
|    |-- globalPackageId: long (nullable = true)
|    |-- naPackageId: string (nullable = true)
|    |-- packageName: string (nullable = true)
|    |-- order_id: long (nullable = true)
|    |-- order_address: string (nullable = true)
|    |-- order_number: integer (nullable = true)
|-- supplies: struct (nullable = true)
|    |-- supplyMask: integer (nullable = true)
|    |-- supplyIds: array (nullable = true)
|    |    |-- element: integer (containsNull = true)
|-- timestampDetails: struct (nullable = true)
|    |-- packageTimestamp: string (nullable = true)
|    |-- onboardTimestamp: string (nullable = true)

Я пытаюсь использовать .withColumn (struct ("xxx"), "xxx"), но выглядит по-прежнему не ожидается Есть ли у вас опыт в этом

Спасибо,

1 Ответ

0 голосов
/ 29 февраля 2020

Вот одно решение вашей проблемы с использованием внутреннего соединения и выбора:

import org.apache.spark.sql.functions.struct

dfA.join(dfB, dfA("package.globalPackageId") === dfB("globalPackageId"), "inner")
   .select(
       struct(
            dfA("package.globalPackageId"),
            dfA("package.naPackageId"),
            dfA("package.packageName"),
            dfB("order_id"),
            dfB("order_address"),
            dfB("order_number")
       ).as("package"),
       struct(
            dfA("supplies.supplyMask"),
            dfA("supplies.supplyIds")
       ).as("supplies"),
       struct(
            dfA("timestampDetails.packageTimestamp"),
            dfA("timestampDetails.onboardTimestamp")
       )
   )

Или, альтернативно, используйте наборы данных вместе с классами падежей, чтобы использовать строгую типизацию в Spark. Этот подход определенно требует больше кода, хотя классы case могут улучшить удобство сопровождения кода. Вот это второе решение:

case class recordInfo(_package: packageInfo, supplies :supplyInfo, timestampDetails :tsDetails) // note that package is a keyword in scala
case class recordInfoExtended(_package: packageInfoExtended, supplies :supplyInfo, timestampDetails :tsDetails)
case class supplyInfo(supplyMask :Int, supplyIds :Seq[Int])
case class tsDetails(packageTimestamp :String, onboardTimestamp :String)
case class orderInfo(globalPackageId :Long, order_id :Long, order_address: String, order_number: Int)
case class packageInfoExtended(globalPackageId :Long, naPackageId :String, packageName :String, order_id :Long, order_address :String, order_number :Int) 
case class packageInfo(globalPackageId :Long, naPackageId :String, packageName :String)

val dsA = Seq(
  recordInfo(packageInfo(1, "r334", "packA"), supplyInfo(234, Array(2, 4)), tsDetails("2020-02-20 12:34:23", "2020-02-20 12:34:39")),
  recordInfo(packageInfo(2, "r366", "packB"), supplyInfo(444, Array(3, 5)), tsDetails("2020-02-20 01:10:00", "2020-02-20 12:34:23"))
).toDS

val dsB = Seq(
  orderInfo(1, 56, "Athens Exarhia Square", 7889),
  orderInfo(2, 100, "Amsterdam Dam Square", 2211)
).toDS

val finalDs = dsA.join(dsB, dsA("_package.globalPackageId") === dsB("globalPackageId"), "inner")
   .map{ r =>
     val pack = r.getStruct(0)
     val supply = r.getStruct(1)
     val ts = r.getStruct(2)

     recordInfoExtended (
       packageInfoExtended(
         pack.getAs[Long]("globalPackageId"),
         pack.getAs[String]("naPackageId"),
         pack.getAs[String]("packageName"),
         r.getAs[Long]("order_id"),
         r.getAs[String]("order_address"),
         r.getAs[Int]("order_number")
       ),
       supplyInfo(
         supply.getAs[Int]("supplyMask"),
         supply.getAs[Seq[Int]]("supplyIds")
       ),
       tsDetails(
         ts.getAs[String]("packageTimestamp"),
         ts.getAs[String]("onboardTimestamp")
       )
     )
   }

finalDs.show(false)

// +-------------------------------------------------+-------------+------------------------------------------+
// |_package                                         |supplies     |timestampDetails                          |
// +-------------------------------------------------+-------------+------------------------------------------+
// |[1, r334, packA, 56, Athens Exarhia Square, 7889]|[234, [2, 4]]|[2020-02-20 12:34:23, 2020-02-20 12:34:39]|
// |[2, r366, packB, 100, Amsterdam Dam Square, 2211]|[444, [3, 5]]|[2020-02-20 01:10:00, 2020-02-20 12:34:23]|
// +-------------------------------------------------+-------------+------------------------------------------+

finalDs.printSchema

// root
//  |-- _package: struct (nullable = true)
//  |    |-- globalPackageId: long (nullable = false)
//  |    |-- naPackageId: string (nullable = true)
//  |    |-- packageName: string (nullable = true)
//  |    |-- order_id: long (nullable = false)
//  |    |-- order_address: string (nullable = true)
//  |    |-- order_number: integer (nullable = false)
//  |-- supplies: struct (nullable = true)
//  |    |-- supplyMask: integer (nullable = false)
//  |    |-- supplyIds: array (nullable = true)
//  |    |    |-- element: integer (containsNull = false)
//  |-- timestampDetails: struct (nullable = true)
//  |    |-- packageTimestamp: string (nullable = true)
//  |    |-- onboardTimestamp: string (nullable = true)
...