Я пытаюсь написать функцию, которую можно использовать в нескольких наборах данных. Бизнес-логика c включает объединение с набором данных (с известной схемой). Я читал, как это выглядит, это можно сделать с помощью любого из 2 подходов (это мое лучшее предположение) -
- Выполнить типизированное соединение в Scala с Наборы данных Spark Пока ничего не написано для этого подхода.
- https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1061
def genericJoinMainFunction[A,B](sparkSession: SparkSession,input:Dataset[A]):Dataset[B] = {
val pmInput = "/random/input/path"
val knownDSLoad = FlatFileLoader.load[KnownSchema1](sparkSession,pmInput,Map("delimiter" -> "\t"))
val joinExpr = (knownDSLoad("col1")===input("col1") && knownDSLoad("col2")===input("col2") && knownDSLoad("col3")===input("col3"))
val joined = sparkSession.sessionState.executePlan(
Join(input.queryExecution.logical,knownDSLoad.queryExecution.logical,JoinType("left_outer"),
Some(joinExpr.expr))
).analyzed.asInstanceOf[Join]
import sparkSession.implicits._
// currently unsure of next step to successfully execute the join
// post join: call a custom business logic in map and attach a new column to case class A to generate case class B
}
Второй вариант - это пытаясь скопировать код соединения. Создание логического плана, а затем создание физического плана. Я застрял на том, как прийти написать физический план на данный момент. В приведенном выше коде - я ожидаю, что входной набор данных будет иметь столбцы col1 , col2 , col3 . После объединения запускается пользовательская функция бизнес-логики c, которая генерирует логическое значение и добавляется к записям. Таким образом, разница между входным набором данных со схемой A и выводом со схемой B составляет логический столбец .
Я запутался относительно того, что может быть возможным способом реализации этого типа функциональности. Ищете какое-то направление. Если есть вещи, которые я пропустил, доволен, дайте мне знать.