Как сравнить наборы данных? - PullRequest
0 голосов
/ 28 февраля 2019

У меня есть приложение Spark, которое выполняет пользовательские запросы правильной формы к набору данных.Каждый из них работает только с подмножеством всего набора данных, называемого «группами», которые на самом деле являются просто фильтрами в наборе данных и могут быть определены программистом.

type Group = DataFrame => DataFrame
val groupA = _.filter($"column1" > 0)
val groupB = _.filter($"column2" > 0 && $"column3" === 0)

val constraint1 = constraint(groupA, _.count == 0)
val constraint2 = constraint(groupA, _.dropDuplicates($"column3").count == 1)
// and so on
val constraint3 = constraint(groupB, _.count == 0)
...

framework.add(constraint1, constraint2, constraint3)
framework.execute()

Для каждого будет много ограниченийгруппа, поэтому, чтобы ускорить ее, я хочу собрать ограничения по их группам, кэшировать группу и запускать их ограничения друг за другом (или параллельно).

Итак, чтобы определить, принадлежат ли два ограничениядля них же группы Мне нужен какой-то способ сравнить наборы данных на равенство .

Моя идея состояла в том, чтобы сравнить их, используя semanticHash логических плановНаборы данных, однако есть несколько логических планов, связанных с одним набором данных, и мне интересно, какой выбрать.

Каков наилучший способ сделать это?

1 Ответ

0 голосов
/ 28 февраля 2019

Итак, я немного поэкспериментировал и обнаружил следующее в Spark 2.4.0

def equal(a: Dataset[Row], b: Dataset[Row], expected: Boolean) = {
  println(s"by logical hashCode ${a.queryExecution.logical.semanticHash == b.queryExecution.logical.semanticHash}")
  println(s"by logical sameResult ${a.queryExecution.logical.sameResult(b.queryExecution.logical)}")
  println(s"by optimized hashCode ${a.queryExecution.optimizedPlan.semanticHash == b.queryExecution.optimizedPlan.semanticHash}")
  println(s"by optimized sameResult ${a.queryExecution.optimizedPlan.sameResult(b.queryExecution.optimizedPlan)}")
  println(s"expected: $expected")
  println("\n")
}

val a = spark.createDataset(Seq(1, 2)).filter($"value" > 1).filter($"value" > 1).toDF
val b = spark.createDataset(Seq(1, 2)).filter($"value" > 1).toDF
val c = spark.createDataset(Seq(2, 3)).filter($"value" > 1).toDF
val d = spark.createDataset(Seq(2, 3)).filter($"value" < 1).toDF
val e = spark.read.parquet("/test_1")
val f = spark.read.parquet("/test_1")
val g = spark.read.parquet("/test_2")
val h = spark.read.parquet("/test_1").filter($"value" < 1)
val i = spark.read.parquet("/test_1").filter($"value" > 1)

equal(a, b, true)
// by logical hashCode false 
// by logical sameResult false 
// by optimized hashCode true 
// by optimized sameResult true 
// expected: true 

equal(b, c, false)
// by logical hashCode false 
// by logical sameResult false 
// by optimized hashCode false 
// by optimized sameResult false 
// expected: false 


equal(c, d, false)
// by logical hashCode true 
// by logical sameResult false 
// by optimized hashCode false 
// by optimized sameResult false 
// expected: false 


equal(e, f, true)
// by logical hashCode true 
// by logical sameResult true 
// by optimized hashCode true 
// by optimized sameResult true 
// expected: true 

equal(e, g, false)
// by logical hashCode false 
// by logical sameResult false 
// by optimized hashCode false 
// by optimized sameResult false 
// expected: false

equal(h, i, false)
// by logical hashCode true 
// by logical sameResult false
// by optimized hashCode true
// by optimized sameResult false
// expected: false

Так что, думаю, я хочу выбрать sameResults в оптимизированном плане.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...