Как сравнить 2 файла в spark, используя scala с одной и той же схемой - PullRequest
0 голосов
/ 07 мая 2019

Итак, у меня есть эти 2 файла (принимая местоположение файла hdfs в таблице кустов), file1 с записями истории и file2 с записями текущего дня. У них обоих одинаковая схема. Теперь я хочу сделать процесс CDC, чтобы получить обновленные / вновь вставленные записи после сравнения двух файлов. В нескольких столбцах могут быть изменения, поэтому мы хотим извлечь все измененные столбцы сразу. Предположим, что столбцы: - Customer_ID, Имя, Адрес, Страна. Теперь Customer_ID является первичным ключом, тогда как остальные 3 столбца могут измениться.

Файл 1

12343| John| Rear exit market| SanFrancisco
45656| Bobs| Knewbound Road PD| Seattle
54345| Fersi| Dallas Road Pnth| Newyork
86575| Persa| Roman Building Path| Kirkland
64565| Camy| Olympus Ground 3rd| NewJersey

Файл 2

12343| John| World Centre Phase| SanFrancisco
54345| Posi| Dallas Road Pnth| Newyork

Я хочу, чтобы конечный результат выглядел так: -

12343|Rear exit market| World Centre Phase
54345| Fersi| Posi

Итак, я хочу, чтобы первичный ключ, предыдущие записи перед изменением, новая обновленная запись, которая была обновлена ​​в моем окончательном ответе.

1 Ответ

0 голосов
/ 10 мая 2019

Вот возможное решение. Как уже упоминалось в моем комментарии, это в значительной степени 3 или 4 лайнера, но я даю несколько альтернатив.

// Load the data into 2 dataframes
val df1 = spark.read.option("sep","|").csv("file1a.txt")
val df2 = spark.read.option("sep","|").csv("file2a.txt")

// Next join the two dataframes using an INNER JOIN on the key as follows:
val joined = df1.joinWith(df2, df1.col("_c0") === df2.col("_c0"))

В файле нет информации заголовка, поэтому столбцы получат имена по умолчанию. Схема соединения в основном представляет собой Tuple2, каждый из которых содержит список столбцов с каждой стороны объединения.

Вот схемы:

scala> df1.printSchema
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)


scala> joined.printSchema
root
 |-- _1: struct (nullable = false)
 |    |-- _c0: string (nullable = true)
 |    |-- _c1: string (nullable = true)
 |    |-- _c2: string (nullable = true)
 |    |-- _c3: string (nullable = true)
 |-- _2: struct (nullable = false)
 |    |-- _c0: string (nullable = true)
 |    |-- _c1: string (nullable = true)
 |    |-- _c2: string (nullable = true)
 |    |-- _c3: string (nullable = true)

Последний шаг - это (я думаю) экстраполяция того, что вы хотите произвести. Я думаю, что вы хотите показать, какие столбцы имеют разные значения. Формат вывода, который вы показываете, имеет, IMHO, пару потенциальных проблем. Я думаю, что вы просто хотите показать в двух выходных столбцах значение, которое отличается. ИМХО, у этого есть пара проблем:

  1. Что если в записи есть два столбца с разными значениями - и, следовательно, вам нужно отобразить 4 (или более) значения для каждой выходной записи?
  2. Что, если имеется много записей с различиями, будет очень трудно найти исходные столбцы, которые были разными (поскольку любая идентификация столбца с разницей теряется в выходных данных) - это будет сложнее с записи с большим количеством столбцов.
  3. Решение, вероятно, будет более сложным, если вы перемешаете столбцы в наборе результатов.

Следующий выходной формат решает вышеуказанные задачи, показывая все столбцы вместе с индикатором, показывающим, какие столбцы имеют разные значения. Индикатор является ключевым, так как он делает различия легче обнаружить. Вот метод "грубой силы", при котором каждый столбец указан в списке и каждая разница определена вручную.

joined.select($"_1._C0".as("id"), $"_1._c1", $"_2._c1", when(col("_1._c1") === col("_2._c1"), "").otherwise("ne").as("c1 Ind"),
  $"_1._c2", $"_2._c2", when(col("_1._c2") === col("_2._c2"), "").otherwise("ne").as("c2 Ind"),
  $"_1._c3", $"_2._c3", when(col("_1._c3") === col("_2._c3"), "").otherwise("ne").as("c3 Ind")).show(false)

Который производит:

+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
|id   |_c1  |_c1 |c1 Ind|_c2             |_c2               |c2 Ind|_c3         |_c3         |c3 Ind|
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
|12343|John |John|      |Rear exit market|World Centre Phase|ne    |SanFrancisco|SanFrancisco|      |
|54345|Fersi|Posi|ne    |Dallas Road Pnth|Dallas Road Pnth  |      |Newyork     |Newyork     |      |
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+

Метод грубой силы утомителен и труден для ввода - особенно для больших наборов результатов. Поэтому мы можем использовать немного магии Скалы, чтобы сделать это немного более элегантным.

// Define a helper function that takes a column name and returns the three parts needed
// to generate the output for that column. i.e. select the column from the two sides of the joined result set
// and generate the case statement to generate the "ne" indicator if the two values
// are unequal.
def genComp(colName:String) = List(s"_1.$colName", s"_2.$colName", s"case when _1.$colName = _2.$colName then '' else 'ne' end as ${colName}_ind")

// Run the query to produce the results:
joined.selectExpr(
    (List("_1._C0 as id") ++ genComp("_c1") ++ genComp("_c2") ++ genComp("_c3")) : _*
  ).show(false)

При запуске это приводит к тому же результату, что и метод "грубой силы".

Как это работает? Ну, магия во второй строке и особенность метода selectExpr.

Метод selectExpr имеет следующую подпись: def selectExpr(exprs: String*): org.apache.spark.sql.DataFrame. Это означает, что он может принимать переменное количество строковых аргументов.

Для генерации аргументов, передаваемых selectExpr, я использую эту конструкцию List (strings) : _*. Это "магия" Scala, которая берет список строк и преобразует его в список параметров с переменным числом аргументов.

Все остальное довольно просто. В основном функция genComp возвращает список строк, которые идентифицируют столбец с каждой стороны объединенного DataFrame вместе с логикой генерации индикатора неравенства. Объединяет их всех вместе. Результат преобразуется в список параметров, передаваемый в selectExpr, который в конечном итоге выполняет тот же запрос, что и метод "грубой силы".

Вот интересная мысль, которую я оставлю вам в качестве упражнения: используйте схему df1, чтобы сгенерировать список столбцов для вывода с использованием genComp (в отличие от простого ручного объединения их, как я показал).

Вот подсказка big :

val cols = df1.schema.filter(c => c.name != "_c0").map(c => List(c.name)).flatten
cols.foreach(println)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...