Сравнение значений столбцов в кадре данных Spark - PullRequest
0 голосов
/ 30 августа 2018

У меня есть фрейм данных, который содержит огромное количество записей. В этом DF запись может повторяться несколько раз, и каждый раз, когда она обновляется, в последнем обновленном поле будет указана дата ее изменения.

У нас есть группа столбцов, по которым мы хотим сравнить строки с одинаковыми идентификаторами. Во время этого сравнения мы хотим получить информацию о том, какие поля / столбцы изменились с предыдущей записи к текущей записи, и зафиксировать это в столбце «updated_columns» обновленной записи. Сравните эту вторую запись с третьей записью и определите обновленные столбцы и запишите, что в поле «updated_columns» третьей записи, продолжайте то же самое до последней записи этого идентификатора и делайте то же самое для каждого идентификатора, который имеет более одной записи .

Первоначально мы сгруппировали столбцы и создали хэш из этой группы столбцов и сравнили со значениями хеш-функции следующей строки, таким образом, это помогает мне идентифицировать записи, которые имеют обновления, но хотят получить столбцы, которые были обновлены.

Здесь я делюсь некоторыми данными, которые являются ожидаемым результатом, и именно так должны выглядеть окончательные данные после добавления обновленных столбцов (здесь я могу сказать, используйте столбцы Col1, Col2, Col3, col4 и Col5 для сравнения между двумя строками ):

enter image description here

Хотите сделать это эффективным способом. Любой пробовал что-то подобное.

Нужна помощь!

~ Криш.

Ответы [ 2 ]

0 голосов
/ 01 сентября 2018

A окно может быть использовано.

Идея состоит в том, чтобы сгруппировать данные по ID , отсортировать их по LAST-UPDATED , скопировать значения предыдущей строки (если она существует) в текущую строку, а затем сравнить скопированные данные с текущими значениями.

val data = ... //the dataframe has the columns ID,Col1,Col2,Col3,Col4,Col5,LAST_UPDATED,IS_DELETED

val fieldNames = data.schema.fieldNames.dropRight(1) //1
val columns = fieldNames.map(f => col(f))
val windowspec = Window.partitionBy("ID").orderBy("LAST_UPDATED") //2
def compareArrayUdf() = ... //3

val result = data
  .withColumn("cur", array(columns: _*)) //4
  .withColumn("prev", lag($"cur", 1).over(windowspec)) //5
  .withColumn("updated_columns", compareArrayUdf()($"cur", $"prev")) //6
  .drop("cur", "prev") //7
  .orderBy("LAST_UPDATED")

Примечания:

  1. создать список всех полей для сравнения. Все поля кроме последнего ( LAST-UPDATED ) используются
  2. создать окно, которое разделено на ID , и каждый раздел отсортирован по LAST-UPDATED
  3. создать udf, который сравнивает два массива и отображает обнаруженные различия в имена полей, код см. Ниже
  4. создать новый столбец, который содержит все значения, которые нужно сравнить
  5. создайте новый столбец, который содержит все значения предыдущей строки (с использованием функции lag ), которые необходимо сравнить. Предыдущая строка - это строка с тем же ID и самой большой LAST-UPDATED , которая меньше текущей. Это поле может быть пустым
  6. сравните два новых столбца и поместите результат в updated-столбцы
  7. отбросьте два промежуточных столбца, созданных на шаге 3 и 4

Сравнить ArraysUdf равно

def compareArray(cur: mutable.WrappedArray[String], prev: mutable.WrappedArray[String]): String = {
  if (prev == null || cur == null) return ""
  val res = new StringBuilder
  for (i <- cur.indices) {
    if (!cur(i).contentEquals(prev(i))) {
      if (res.nonEmpty) res.append(",")
      res.append(fieldNames(i))
    }
  }
  res.toString()
}
def compareArrayUdf() = udf[String, mutable.WrappedArray[String], mutable.WrappedArray[String]](compareArray)
0 голосов
/ 30 августа 2018

Вы можете присоединить свой DataFrame или DataSet к себе, объединяя строки, в которых идентификатор одинаков в обеих строках, а версия левой строки равна i, а версия правой строки - i+1. Вот пример

case class T(id: String, version: Int, data: String)

val data = Seq(T("1", 1, "d1-1"), T("1", 2, "d1-2"), T("2", 1, "d2-1"), T("2", 2, "d2-2"), T("2", 3, "d2-3"), T("3", 1, "d3-1"))
data: Seq[T] = List(T(1,1,d1-1), T(1,2,d1-2), T(2,1,d2-1), T(2,2,d2-2), T(2,3,d2-3), T(3,1,d3-1))

val ds = data.toDS

val joined = ds.as("ds1").join(ds.as("ds2"), $"ds1.id" === $"ds2.id" && (($"ds1.version"+1) === $"ds2.version"))

И затем вы можете ссылаться на столбцы в новом DataFrame / DataSet, например $"ds1.data, $"ds2.data и т. Д.

Чтобы найти строки, в которых данные изменились с одной версии на другую, вы можете сделать

joined.filter($"ds1.data" !== $"ds2.data")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...