Вот одно решение вашей проблемы с использованием внутреннего соединения и выбора:
import org.apache.spark.sql.functions.struct
dfA.join(dfB, dfA("package.globalPackageId") === dfB("globalPackageId"), "inner")
.select(
struct(
dfA("package.globalPackageId"),
dfA("package.naPackageId"),
dfA("package.packageName"),
dfB("order_id"),
dfB("order_address"),
dfB("order_number")
).as("package"),
struct(
dfA("supplies.supplyMask"),
dfA("supplies.supplyIds")
).as("supplies"),
struct(
dfA("timestampDetails.packageTimestamp"),
dfA("timestampDetails.onboardTimestamp")
)
)
Или, альтернативно, используйте наборы данных вместе с классами падежей, чтобы использовать строгую типизацию в Spark. Этот подход определенно требует больше кода, хотя классы case могут улучшить удобство сопровождения кода. Вот это второе решение:
case class recordInfo(_package: packageInfo, supplies :supplyInfo, timestampDetails :tsDetails) // note that package is a keyword in scala
case class recordInfoExtended(_package: packageInfoExtended, supplies :supplyInfo, timestampDetails :tsDetails)
case class supplyInfo(supplyMask :Int, supplyIds :Seq[Int])
case class tsDetails(packageTimestamp :String, onboardTimestamp :String)
case class orderInfo(globalPackageId :Long, order_id :Long, order_address: String, order_number: Int)
case class packageInfoExtended(globalPackageId :Long, naPackageId :String, packageName :String, order_id :Long, order_address :String, order_number :Int)
case class packageInfo(globalPackageId :Long, naPackageId :String, packageName :String)
val dsA = Seq(
recordInfo(packageInfo(1, "r334", "packA"), supplyInfo(234, Array(2, 4)), tsDetails("2020-02-20 12:34:23", "2020-02-20 12:34:39")),
recordInfo(packageInfo(2, "r366", "packB"), supplyInfo(444, Array(3, 5)), tsDetails("2020-02-20 01:10:00", "2020-02-20 12:34:23"))
).toDS
val dsB = Seq(
orderInfo(1, 56, "Athens Exarhia Square", 7889),
orderInfo(2, 100, "Amsterdam Dam Square", 2211)
).toDS
val finalDs = dsA.join(dsB, dsA("_package.globalPackageId") === dsB("globalPackageId"), "inner")
.map{ r =>
val pack = r.getStruct(0)
val supply = r.getStruct(1)
val ts = r.getStruct(2)
recordInfoExtended (
packageInfoExtended(
pack.getAs[Long]("globalPackageId"),
pack.getAs[String]("naPackageId"),
pack.getAs[String]("packageName"),
r.getAs[Long]("order_id"),
r.getAs[String]("order_address"),
r.getAs[Int]("order_number")
),
supplyInfo(
supply.getAs[Int]("supplyMask"),
supply.getAs[Seq[Int]]("supplyIds")
),
tsDetails(
ts.getAs[String]("packageTimestamp"),
ts.getAs[String]("onboardTimestamp")
)
)
}
finalDs.show(false)
// +-------------------------------------------------+-------------+------------------------------------------+
// |_package |supplies |timestampDetails |
// +-------------------------------------------------+-------------+------------------------------------------+
// |[1, r334, packA, 56, Athens Exarhia Square, 7889]|[234, [2, 4]]|[2020-02-20 12:34:23, 2020-02-20 12:34:39]|
// |[2, r366, packB, 100, Amsterdam Dam Square, 2211]|[444, [3, 5]]|[2020-02-20 01:10:00, 2020-02-20 12:34:23]|
// +-------------------------------------------------+-------------+------------------------------------------+
finalDs.printSchema
// root
// |-- _package: struct (nullable = true)
// | |-- globalPackageId: long (nullable = false)
// | |-- naPackageId: string (nullable = true)
// | |-- packageName: string (nullable = true)
// | |-- order_id: long (nullable = false)
// | |-- order_address: string (nullable = true)
// | |-- order_number: integer (nullable = false)
// |-- supplies: struct (nullable = true)
// | |-- supplyMask: integer (nullable = false)
// | |-- supplyIds: array (nullable = true)
// | | |-- element: integer (containsNull = false)
// |-- timestampDetails: struct (nullable = true)
// | |-- packageTimestamp: string (nullable = true)
// | |-- onboardTimestamp: string (nullable = true)