spark sql scala - объединение нескольких паркетов [разных классов дел] для сопоставления с одним новым вложенным классом дел - PullRequest
0 голосов
/ 17 января 2020

Предположим, у меня есть 4 паркета, каждый в своем классе дел, как показано ниже:

Case Class BusinessRaw(
                    @id node_id: Long,
                    businessName: Option[String],
                    incorporation_date: Option[java.sql.Date]
    )

Case Class StaffRaw(
                @id node_id: Long,
                name: Option[String],
                birth_date: Option[java.sql.Date]
)


Case Class EdgesRaw(
                @id start_node_id: Long,
                end_node_id: Option[Long],
                types: Option[String]
)

Case Class AddressRaw(                      //contains both business and staff address
                @id node_id: Long,
                address: Option[String],
                types: Option[String]
)

Мне нужно присоединиться к паркету 4 и сопоставить его с одним вложенным классом дела, как показано ниже:

Case Class Network(
                   @id network_id: Long,  //BusinessRaw.node_id
                   businessName: Option[String],
                   incorporation_date: Option[java.sql.Date],
                   address: Option[String],       //business address
                   types: Option[String]
                   Case Class staff(
                                  @id staff_id: Long,    // StaffRaw.node_id
                                  name: Option[String],
                                  birth_date: Option[java.sql.Date],
                                  staff_address: Option[String]     //staff address
                                   )
                   )

Примечание. Синтаксис моего вложенного класса дел может быть неправильным. Это ожидаемая структура.

Мой scala код, как показано ниже:

// read parquet as datasets
    val businessRawDS = spark.read.parquet(bussinessPath).as[businessRaw]
    val edgesRawDS = spark.read.parquet(edgesPath).as[edgesRaw]
    val addressRawDS = spark.read.parquet(addressPath).as[addressRaw]
    val staffRawDS = spark.read.parquet(staffPath).as[staffRaw]


// join datasets base on conditions
    val allJoinDS = edgesRawDS //To Do filter columns after generating data quality report
      .join(businessRawDS, edgesRawDS("start_id") === entityRawDS("node_id"), "left_outer")
      .join(addressRawDS, edgesRawDS("start_id") === addressRawDS("node_id"), "left_outer")
      .join(staffRawDS, edgesRawDS("start_id") === staffRawDS("node_id"), "left_outer")
    .join(businessRawDS, edgesRawDS("end_id") === businessRawDS("node_id"), "left_outer")
    .join(addressRawDS, edgesRawDS("end_id") === addressRawDS("node_id"), "left_outer")
    .join(staffRawDS, edgesRawDS("end_id") === staffRawDS("node_id"), "left_outer")

как мне использовать allJoinDS.map для получения соответствующих полей из карты нескольких наборов данных во вложенный класс case ? Мне не удалось найти какую-либо соответствующую информацию в Интернете.

...