Это должно работать для вас:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val dfA = Seq(
("001", "10", "Cat"),
("001", "20", "Dog"),
("001", "30", "Bear"),
("002", "10", "Mouse"),
("002", "20", "Squirrel"),
("002", "30", "Turtle")
).toDF("Package", "LineItem", "Animal")
val dfB = Seq(
("001", "", "X", "A"),
("001", "", "Y", "B"),
("002", "", "X", "C"),
("002", "", "Y", "D"),
("002", "20", "X" ,"E")
).toDF("Package", "LineItem", "Flag", "Category")
val result = {
dfA.as("a")
.join(dfB.where('Flag === "X").as("b"), $"a.Package" === $"b.Package" and ($"a.LineItem" === $"b.LineItem" or $"b.LineItem" === ""), "left")
.withColumn("anyRowsInGroupWithBLineItemDefined", first(when($"b.LineItem" =!= "", lit(true)), ignoreNulls = true).over(Window.partitionBy($"a.Package", $"a.LineItem")).isNotNull)
.where(!$"anyRowsInGroupWithBLineItemDefined" or ($"anyRowsInGroupWithBLineItemDefined" and $"b.LineItem" =!= ""))
.select($"a.Package", $"a.LineItem", $"a.Animal", $"b.Category")
}
result.orderBy($"a.Package", $"a.LineItem").show(false)
// +-------+--------+--------+--------+
// |Package|LineItem|Animal |Category|
// +-------+--------+--------+--------+
// |001 |10 |Cat |A |
// |001 |20 |Dog |A |
// |001 |30 |Bear |A |
// |002 |10 |Mouse |C |
// |002 |20 |Squirrel|E |
// |002 |30 |Turtle |C |
// +-------+--------+--------+--------+
"хитрая" часть вычисляет, есть ли строки с LineItem
, определенным в dfB
для данного Package
, LineItem
в dfA
. Вы можете увидеть, как я выполняю этот расчет в anyRowsInGroupWithBLineItemDefined
, который включает использование оконной функции. Кроме этого, это просто обычное SQL упражнение по программированию.
Также хочу отметить, что этот код должен быть более эффективным, чем другое решение, так как здесь мы только перетасовываем данные дважды (во время соединения и во время оконной функции) и только читаем в каждом наборе данных один раз.