Вот возможное решение. Как уже упоминалось в моем комментарии, это в значительной степени 3 или 4 лайнера, но я даю несколько альтернатив.
// Load the data into 2 dataframes
val df1 = spark.read.option("sep","|").csv("file1a.txt")
val df2 = spark.read.option("sep","|").csv("file2a.txt")
// Next join the two dataframes using an INNER JOIN on the key as follows:
val joined = df1.joinWith(df2, df1.col("_c0") === df2.col("_c0"))
В файле нет информации заголовка, поэтому столбцы получат имена по умолчанию. Схема соединения в основном представляет собой Tuple2, каждый из которых содержит список столбцов с каждой стороны объединения.
Вот схемы:
scala> df1.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
scala> joined.printSchema
root
|-- _1: struct (nullable = false)
| |-- _c0: string (nullable = true)
| |-- _c1: string (nullable = true)
| |-- _c2: string (nullable = true)
| |-- _c3: string (nullable = true)
|-- _2: struct (nullable = false)
| |-- _c0: string (nullable = true)
| |-- _c1: string (nullable = true)
| |-- _c2: string (nullable = true)
| |-- _c3: string (nullable = true)
Последний шаг - это (я думаю) экстраполяция того, что вы хотите произвести. Я думаю, что вы хотите показать, какие столбцы имеют разные значения. Формат вывода, который вы показываете, имеет, IMHO, пару потенциальных проблем. Я думаю, что вы просто хотите показать в двух выходных столбцах значение, которое отличается. ИМХО, у этого есть пара проблем:
- Что если в записи есть два столбца с разными значениями - и, следовательно, вам нужно отобразить 4 (или более) значения для каждой выходной записи?
- Что, если имеется много записей с различиями, будет очень трудно найти исходные столбцы, которые были разными (поскольку любая идентификация столбца с разницей теряется в выходных данных) - это будет сложнее с записи с большим количеством столбцов.
- Решение, вероятно, будет более сложным, если вы перемешаете столбцы в наборе результатов.
Следующий выходной формат решает вышеуказанные задачи, показывая все столбцы вместе с индикатором, показывающим, какие столбцы имеют разные значения. Индикатор является ключевым, так как он делает различия легче обнаружить.
Вот метод "грубой силы", при котором каждый столбец указан в списке и каждая разница определена вручную.
joined.select($"_1._C0".as("id"), $"_1._c1", $"_2._c1", when(col("_1._c1") === col("_2._c1"), "").otherwise("ne").as("c1 Ind"),
$"_1._c2", $"_2._c2", when(col("_1._c2") === col("_2._c2"), "").otherwise("ne").as("c2 Ind"),
$"_1._c3", $"_2._c3", when(col("_1._c3") === col("_2._c3"), "").otherwise("ne").as("c3 Ind")).show(false)
Который производит:
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
|id |_c1 |_c1 |c1 Ind|_c2 |_c2 |c2 Ind|_c3 |_c3 |c3 Ind|
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
|12343|John |John| |Rear exit market|World Centre Phase|ne |SanFrancisco|SanFrancisco| |
|54345|Fersi|Posi|ne |Dallas Road Pnth|Dallas Road Pnth | |Newyork |Newyork | |
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
Метод грубой силы утомителен и труден для ввода - особенно для больших наборов результатов. Поэтому мы можем использовать немного магии Скалы, чтобы сделать это немного более элегантным.
// Define a helper function that takes a column name and returns the three parts needed
// to generate the output for that column. i.e. select the column from the two sides of the joined result set
// and generate the case statement to generate the "ne" indicator if the two values
// are unequal.
def genComp(colName:String) = List(s"_1.$colName", s"_2.$colName", s"case when _1.$colName = _2.$colName then '' else 'ne' end as ${colName}_ind")
// Run the query to produce the results:
joined.selectExpr(
(List("_1._C0 as id") ++ genComp("_c1") ++ genComp("_c2") ++ genComp("_c3")) : _*
).show(false)
При запуске это приводит к тому же результату, что и метод "грубой силы".
Как это работает? Ну, магия во второй строке и особенность метода selectExpr.
Метод selectExpr имеет следующую подпись: def selectExpr(exprs: String*): org.apache.spark.sql.DataFrame
. Это означает, что он может принимать переменное количество строковых аргументов.
Для генерации аргументов, передаваемых selectExpr, я использую эту конструкцию List (strings) : _*
. Это "магия" Scala, которая берет список строк и преобразует его в список параметров с переменным числом аргументов.
Все остальное довольно просто. В основном функция genComp возвращает список строк, которые идентифицируют столбец с каждой стороны объединенного DataFrame вместе с логикой генерации индикатора неравенства. Объединяет их всех вместе. Результат преобразуется в список параметров, передаваемый в selectExpr, который в конечном итоге выполняет тот же запрос, что и метод "грубой силы".
Вот интересная мысль, которую я оставлю вам в качестве упражнения: используйте схему df1, чтобы сгенерировать список столбцов для вывода с использованием genComp (в отличие от простого ручного объединения их, как я показал).
Вот подсказка big :
val cols = df1.schema.filter(c => c.name != "_c0").map(c => List(c.name)).flatten
cols.foreach(println)