Объединить два набора данных на основе значения - PullRequest
1 голос
/ 17 марта 2020

У меня есть следующие два набора данных:

val dfA = Seq(
("001", "10", "Cat"),
("001", "20", "Dog"),
("001", "30", "Bear"),
("002", "10", "Mouse"),
("002", "20", "Squirrel"),
("002", "30", "Turtle"),
).toDF("Package", "LineItem", "Animal")

val dfB = Seq(
("001", "", "X", "A"),
("001", "", "Y", "B"),
("002", "", "X", "C"),
("002", "", "Y", "D"),
("002", "20", "X" ,"E")
).toDF("Package", "LineItem", "Flag", "Category")

Мне нужно объединить их с указанными c условиями:

a) В dfB всегда есть строка с флагом X и пустой LineItem, который должен быть категорией по умолчанию для пакета из dfA

b) Когда в dfB указан LineItem, категория по умолчанию должна быть перезаписана с категорией, связанной с этим LineItem

Ожидаемый вывод :

+---------+----------+----------+----------+
| Package | LineItem | Animal   | Category |
+---------+----------+----------+----------+
| 001     | 10       | Cat      | A        |
+---------+----------+----------+----------+
| 001     | 20       | Dog      | A        |
+---------+----------+----------+----------+
| 001     | 30       | Bear     | A        |
+---------+----------+----------+----------+
| 002     | 10       | Mouse    | C        |
+---------+----------+----------+----------+
| 002     | 20       | Squirrel | E        |
+---------+----------+----------+----------+
| 002     | 30       | Turtle   | C        |
+---------+----------+----------+----------+

Сегодня я провожу некоторое время, но понятия не имею, как это можно сделать. Я ценю вашу помощь. Спасибо!

Ответы [ 2 ]

1 голос
/ 18 марта 2020

Вы можете использовать два оператора join + when:

val dfC = dfA
  .join(dfB, dfB.col("Flag") === "X" && dfA.col("LineItem") === dfB.col("LineItem") && dfA.col("Package") === dfB.col("Package"))
  .select(dfA.col("Package").as("priorPackage"), dfA.col("LineItem").as("priorLineItem"), dfB.col("Category").as("priorCategory"))
  .as("dfC")

val dfD = dfA
  .join(dfB, dfB.col("LineItem") === "" && dfB.col("Flag") === "X" && dfA.col("Package") === dfB.col("Package"), "left_outer")
  .join(dfC, dfA.col("LineItem") === dfC.col("priorLineItem") && dfA.col("Package") === dfC.col("priorPackage"), "left_outer")
  .select(
    dfA.col("package"),
    dfA.col("LineItem"),
    dfA.col("Animal"),
    when(dfC.col("priorCategory").isNotNull, dfC.col("priorCategory")).otherwise(dfB.col("Category")).as("Category")
  )

dfD.show()
0 голосов
/ 19 марта 2020

Это должно работать для вас:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val dfA = Seq(
("001", "10", "Cat"),
("001", "20", "Dog"),
("001", "30", "Bear"),
("002", "10", "Mouse"),
("002", "20", "Squirrel"),
("002", "30", "Turtle")
).toDF("Package", "LineItem", "Animal")

val dfB = Seq(
("001", "", "X", "A"),
("001", "", "Y", "B"),
("002", "", "X", "C"),
("002", "", "Y", "D"),
("002", "20", "X" ,"E")
).toDF("Package", "LineItem", "Flag", "Category")

val result = { 
    dfA.as("a")
    .join(dfB.where('Flag === "X").as("b"), $"a.Package" === $"b.Package" and ($"a.LineItem" === $"b.LineItem" or $"b.LineItem" === ""), "left")
    .withColumn("anyRowsInGroupWithBLineItemDefined", first(when($"b.LineItem" =!= "", lit(true)), ignoreNulls = true).over(Window.partitionBy($"a.Package", $"a.LineItem")).isNotNull)
    .where(!$"anyRowsInGroupWithBLineItemDefined" or ($"anyRowsInGroupWithBLineItemDefined" and $"b.LineItem" =!= ""))
    .select($"a.Package", $"a.LineItem", $"a.Animal", $"b.Category")
}

result.orderBy($"a.Package", $"a.LineItem").show(false)

// +-------+--------+--------+--------+
// |Package|LineItem|Animal  |Category|
// +-------+--------+--------+--------+
// |001    |10      |Cat     |A       |
// |001    |20      |Dog     |A       |
// |001    |30      |Bear    |A       |
// |002    |10      |Mouse   |C       |
// |002    |20      |Squirrel|E       |
// |002    |30      |Turtle  |C       |
// +-------+--------+--------+--------+

"хитрая" часть вычисляет, есть ли строки с LineItem, определенным в dfB для данного Package, LineItem в dfA. Вы можете увидеть, как я выполняю этот расчет в anyRowsInGroupWithBLineItemDefined, который включает использование оконной функции. Кроме этого, это просто обычное SQL упражнение по программированию.

Также хочу отметить, что этот код должен быть более эффективным, чем другое решение, так как здесь мы только перетасовываем данные дважды (во время соединения и во время оконной функции) и только читаем в каждом наборе данных один раз.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...