Предположим, у меня есть 4 паркета, каждый в своем классе дел, как показано ниже:
Case Class BusinessRaw(
@id node_id: Long,
businessName: Option[String],
incorporation_date: Option[java.sql.Date]
)
Case Class StaffRaw(
@id node_id: Long,
name: Option[String],
birth_date: Option[java.sql.Date]
)
Case Class EdgesRaw(
@id start_node_id: Long,
end_node_id: Option[Long],
types: Option[String]
)
Case Class AddressRaw( //contains both business and staff address
@id node_id: Long,
address: Option[String],
types: Option[String]
)
Мне нужно присоединиться к паркету 4 и сопоставить его с одним вложенным классом дела, как показано ниже:
Case Class Network(
@id network_id: Long, //BusinessRaw.node_id
businessName: Option[String],
incorporation_date: Option[java.sql.Date],
address: Option[String], //business address
types: Option[String]
Case Class staff(
@id staff_id: Long, // StaffRaw.node_id
name: Option[String],
birth_date: Option[java.sql.Date],
staff_address: Option[String] //staff address
)
)
Примечание. Синтаксис моего вложенного класса дел может быть неправильным. Это ожидаемая структура.
Мой scala код, как показано ниже:
// read parquet as datasets
val businessRawDS = spark.read.parquet(bussinessPath).as[businessRaw]
val edgesRawDS = spark.read.parquet(edgesPath).as[edgesRaw]
val addressRawDS = spark.read.parquet(addressPath).as[addressRaw]
val staffRawDS = spark.read.parquet(staffPath).as[staffRaw]
// join datasets base on conditions
val allJoinDS = edgesRawDS //To Do filter columns after generating data quality report
.join(businessRawDS, edgesRawDS("start_id") === entityRawDS("node_id"), "left_outer")
.join(addressRawDS, edgesRawDS("start_id") === addressRawDS("node_id"), "left_outer")
.join(staffRawDS, edgesRawDS("start_id") === staffRawDS("node_id"), "left_outer")
.join(businessRawDS, edgesRawDS("end_id") === businessRawDS("node_id"), "left_outer")
.join(addressRawDS, edgesRawDS("end_id") === addressRawDS("node_id"), "left_outer")
.join(staffRawDS, edgesRawDS("end_id") === staffRawDS("node_id"), "left_outer")
как мне использовать allJoinDS.map для получения соответствующих полей из карты нескольких наборов данных во вложенный класс case ? Мне не удалось найти какую-либо соответствующую информацию в Интернете.