ИМХО, я не понимаю, что вы подразумеваете под "оптимальным", так как один из способов описан ниже.
Вариант 1:
Если вы хотите найти различия в схеме между двумя кадрами данных
/**
* getAllSchemaDiff
*
* @param schema1 Map[String, (DataType, Boolean)]
* @param schema2 Map[String, (DataType, Boolean)]
* @return Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])]
*/
def getAllSchemaDiff(schema1: Map[String, (DataType, Boolean)],
schema2: Map[String, (DataType, Boolean)]
): Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])] = {
val common = ((schema1.keySet).intersect(schema2.keySet)).map(_.toLowerCase).toList.distinct
LOGGER.info(" common columns are " + common.mkString("\n"))
val distinctkeys = ((schema1.keys) ++ (schema2.keys)).map(_.toLowerCase).toList.distinct
LOGGER.info("distinctkeys - > " + distinctkeys)
distinctkeys.flatMap { columnName: String =>
val schema1FieldOpt: Option[(DataType, Boolean)] = schema1.get(columnName)
val schema2FieldOpt: Option[(DataType, Boolean)] = schema2.get(columnName)
if (schema1FieldOpt == schema2FieldOpt) None
else Some(columnName -> (schema1FieldOpt, schema2FieldOpt))
}.toMap
}
, где getMapFromSchema - это ...
/**
* getMapFromSchema -Extract relevant information: name (key), type & nullability (values) of columns
*
* @param df
* @return
*/
def getMapFromSchema(df: DataFrame): Map[String, (DataType, Boolean)] = {
val a: Seq[(String, (DataType, Boolean))] = df.schema.map { structField: StructField =>
structField.name.toLowerCase -> (structField.dataType, structField.nullable)
}.toList
a.toMap
}
если вы хотите найти различия в столбцах:
/**
* columnWiseDifferences - finds columnWise differences between 2 dataframes one is source and another is target
*
* @param sourceDataFrame
* @param targetDataFrame
* @return
*/
def columnWiseDifferences(sourceDataFrame: DataFrame, targetDataFrame: DataFrame) = {
val columns = sourceDataFrame.schema.fields.map(_.name)
// LOGGER.info("source schema")
// sourceDataFrame.printSchema
// LOGGER.info("target schema")
// targetDataFrame.printSchema
LOGGER.info("Source except target")
var selectiveDifferences = columns.map(col => sourceDataFrame.select(col).except(targetDataFrame.select(col)))
// columns contains different values.
selectiveDifferences.map(diff => {
if (diff.count > 0) diff.show
})
selectiveDifferences = columns.map(col => targetDataFrame.select(col).except(sourceDataFrame.select(col)))
LOGGER.info("target except source")
// columns contains different values.
selectiveDifferences.map(diff => {
if (diff.count > 0) diff.show
})
}
Полный пример:
pacakage com.examples
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataType, StructField}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
/**
* @author Ram Ghadiyaram
*/
object ComparisionUtil {
def main(args: Array[String]): Unit = {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
val spark: SparkSession = SparkSession.builder().appName(this.getClass.getName).master("local[*]").getOrCreate()
import spark.implicits._
val df1 = Seq(
(1, "Ram", ""),
(2, "william peck", ""),
(3, "peck", "")
).toDF("id", "name", "grade")
val df2 = Seq(
(1, "Ram", 3),
(2, "william peck", 3),
(3, "peck", 3)
).toDF("id", "name", "grade")
val diffMap = getAllSchemaDiff(getMapFromSchema(df1), getMapFromSchema(df2))
LOGGER.info("schema differences are " + diffMap)
val srcNTgtDataFrames = getDataFramesWithCommonColumns(df1, df2)
columnWiseDifferences(srcNTgtDataFrames._1, srcNTgtDataFrames._2)
}
private[this] val LOGGER: org.slf4j.Logger = LoggerFactory.getLogger(this.getClass)
/**
* getAllSchemaDiff
*
* @param schema1 Map[String, (DataType, Boolean)]
* @param schema2 Map[String, (DataType, Boolean)]
* @return Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])]
*/
def getAllSchemaDiff(schema1: Map[String, (DataType, Boolean)],
schema2: Map[String, (DataType, Boolean)]
): Map[String, (Option[(DataType, Boolean)], Option[(DataType, Boolean)])] = {
val common = ((schema1.keySet).intersect(schema2.keySet)).map(_.toLowerCase).toList.distinct
LOGGER.info(" common columns are " + common.mkString("\n"))
val distinctkeys = ((schema1.keys) ++ (schema2.keys)).map(_.toLowerCase).toList.distinct
LOGGER.info("distinctkeys - > " + distinctkeys)
distinctkeys.flatMap { columnName: String =>
val schema1FieldOpt: Option[(DataType, Boolean)] = schema1.get(columnName)
val schema2FieldOpt: Option[(DataType, Boolean)] = schema2.get(columnName)
if (schema1FieldOpt == schema2FieldOpt) None
else Some(columnName -> (schema1FieldOpt, schema2FieldOpt))
}.toMap
}
/**
* getDataFramesWithCommonColumns
*/
def getDataFramesWithCommonColumns(srcData: DataFrame, targetData: DataFrame) = {
val schema1 = getMapFromSchema(srcData)
val schema2 = getMapFromSchema(targetData)
val common = ((schema1.keySet).intersect(schema2.keySet)).map(_.toLowerCase).toList.distinct
LOGGER.info(" srcData " + srcData.schema.treeString)
LOGGER.info(" targetData " + targetData.schema.treeString)
LOGGER.info(" **** \n\n\n\ncommon columns in source and target are \n " + common.mkString("\n"))
val df1 = srcData.selectExpr(common: _*)
val df2 = targetData.selectExpr(common: _*)
(df1, df2)
}
/**
* getMapFromSchema -Extract relevant information: name (key), type & nullability (values) of columns
*
* @param df
* @return
*/
def getMapFromSchema(df: DataFrame): Map[String, (DataType, Boolean)] = {
val a: Seq[(String, (DataType, Boolean))] = df.schema.map { structField: StructField =>
structField.name.toLowerCase -> (structField.dataType, structField.nullable)
}.toList
a.toMap
}
/**
* columnWiseDifferences - finds columnWise differences between 2 dataframes one is source and another is target
*
* @param sourceDataFrame
* @param targetDataFrame
* @return
*/
def columnWiseDifferences(sourceDataFrame: DataFrame, targetDataFrame: DataFrame) = {
val columns = sourceDataFrame.schema.fields.map(_.name)
LOGGER.info("source schema")
sourceDataFrame.printSchema
LOGGER.info("target schema")
targetDataFrame.printSchema
LOGGER.info("Source except target")
var selectiveDifferences = columns.map(col => sourceDataFrame.select(col).except(targetDataFrame.select(col)))
// columns contains different values.
selectiveDifferences.map(diff => {
if (diff.count > 0) diff.show
})
selectiveDifferences = columns.map(col => targetDataFrame.select(col).except(sourceDataFrame.select(col)))
LOGGER.info("target except source")
// columns contains different values.
selectiveDifferences.map(diff => {
if (diff.count > 0) diff.show
})
}
/**
* getListFromSchemaWithKeysOnly -Extract relevant information: name (key) of columns
*
* @param df
* @return
*/
def getListFromSchemaWithKeysOnly(df: DataFrame): List[String] = {
val a = df.schema.map { structField: StructField =>
structField.name.toLowerCase
}.toList
a
}
}
Результат:
2019-05-30 12:44:49 INFO ComparisionUtil$:48 - common columns are id
name
grade
2019-05-30 12:44:49 INFO ComparisionUtil$:51 - distinctkeys - > List(id, name, grade)
2019-05-30 12:44:49 INFO ComparisionUtil$:29 - schema differences are Map(grade -> (Some((StringType,true)),Some((IntegerType,false))))
2019-05-30 12:44:49 INFO ComparisionUtil$:68 - srcData root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- grade: string (nullable = true)
2019-05-30 12:44:49 INFO ComparisionUtil$:69 - targetData root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- grade: integer (nullable = false)
2019-05-30 12:44:49 INFO ComparisionUtil$:70 - ****
common columns in source and target are
id
name
grade
2019-05-30 12:44:49 INFO ComparisionUtil$:101 - source schema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- grade: string (nullable = true)
2019-05-30 12:44:49 INFO ComparisionUtil$:103 - target schema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- grade: integer (nullable = false)
2019-05-30 12:44:49 INFO ComparisionUtil$:105 - Source except target
+-----+
|grade|
+-----+
| |
+-----+
2019-05-30 12:44:52 INFO ComparisionUtil$:114 - target except source
+-----+
|grade|
+-----+
| 3|
+-----+
Примечание: если вы хотите использовать различия по столбцам в приведенном ниже фрагменте кода, вы можете убедиться, что если количество строк в ваших таблицах оракула не совпадает, тоДействуй.в противном случае для столбцовых различий потребуется некоторое время, если вы используете это с большими наборами данных.
, если вы не хотите различий между столбцами и схемами, вы можете адаптировать следующий подход, используя except
на фрейме данных ... дляСозданы вышеуказанные кадры данных.
Опция 2:
Другой вариант использования, кроме простого, - это левая сторона, кроме правой стороны
println("Another option using except which might be simple is ")
val difference1 = (df1.except(df2))
println("left hand side except right hand side ")
difference1.show()
val difference2 = (df2.except(df1))
println("right hand side except left hand side ")
difference2.show()
Результат:
Another option using except which might be simple is
left hand side except right hand side
+---+------------+-----+
| id| name|grade|
+---+------------+-----+
| 2|william peck| |
| 3| peck| |
| 1| Ram| |
+---+------------+-----+
right hand side except left hand side
+---+------------+-----+
| id| name|grade|
+---+------------+-----+
| 3| peck| 3|
| 1| Ram| 3|
| 2|william peck| 3|
+---+------------+-----+