Scala Spark столбцы DataFrame в виде карты и сравнить их с помощью Foldleft - PullRequest
0 голосов
/ 07 марта 2019

Все, чего я хочу добиться, это: Изображение 1 Итак, здесь вы можете видеть в первом изображении, у меня есть фрейм данных, в котором первые 4 строки имеют правильные значения хеш-функции, хранящиеся в соответствующих столбцах ("col_1_hash" имеетхеш-значение «col_1» и «col_2_hash» имеет хеш-значение «col_2») в строке 5 оба имеют рабочие хеш-значения (col_1: a, col_2: z, col_1_hash: имеет хеш-значение «z», col_2_hash: имеет хеш-значение«a») и строка 6 имеет одно правое и одно значение рабочего (col_1: d, col_2: w, col_1_hash: имеет хеш-значение «d» (правильно), col_2_hash: имеет хеш-значение «z» (неправильно))

val totallytemp = xtranwedf.filter(( sha2($"col_1",256)  =!= $"col_1_hash") ||
  (sha2($"col_2",256)  =!= $"col_2_hash"))
val total = totallytemp.count

это даст вывод:

total: Long = 2

Выше результатов это то, чего я хочу достичь с помощью foldLeft.Поскольку есть две записи, где есть atleastonematch.

сейчас здесь я знаю, что есть простой способ достичь этого, но просто я не хочу передавать жестко закодированные значения.

Так что яя выполняю сбор данных на фрейме и получаю из него список значений и карту создания.вы увидите на втором изображении. Изображение 2 поэтому здесь я передаю карту и создаю аккумулятор, но он не дает ответа, который должен.как вы увидите на рисунке 1, я хочу получить ответ 2, но этот код дает ответ 6.

val templist = "col_1" :: "col_2" :: Nil
val tempmapingList = Map(templist map {s => (s, s + "_hash")} : _*)

val expr: Column = tempmapingList.foldLeft(lit(false)) 
  { 
  case (acc, (c, h)) => acc or (sha2(col(c), 256) =!= h) 
  }
xtranwedf.filter(expr).count

это дает вывод:

total: Long = 6

Я хочу, чтобы здесь было 2. ноЯ думаю, что это как-то связано со знаком === или =, когда он не создает новый столбец, для которого я могу выполнить подсчет.

1 Ответ

0 голосов
/ 07 марта 2019

Проблема с вашим foldLeft приложением заключается в том, что оно не эквивалентно выражению, которое вы хотите использовать.

Как вы уже сказали, вы ищете

sha2(b, 256) = b_hash OR sha2(c, 256) = c_hash OR sha2(d, 256) = d_hash

, тогда как цепочечный фильтр на DataFrame приводит к

sha2(b, 256) = b_hash AND sha2(c, 256) = c_hash AND sha2(d, 256) = d_hash

Для достижения первого вам нужно заменить аккумулятор:

import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.Column

val atLeastOneMatch: Column = map.foldLeft(lit(false)) { 
  case (acc, (c, h)) => acc or (sha2(col(c), 256) === h) 
}

, а затем использовать результат для фильтрации данных

df.filter(atLeastOneMatch).count

Это будет считать все строки , где хотя бы один столбец соответствует хешу , предоставленному map. По законам де Моргана его отрицание

!atLeastOneMatch

будет эквивалентно

sha2(b, 256) != b_hash AND sha2(c, 256) != c_hash AND sha2(d, 256) = d_hash

Другими словами, он будет соответствовать случаям, когда ни одно из значений не соответствует соответствующему хешу.

Если вы хотите найти строки, в которых хотя бы одно значение не соответствует хешу , вам следует использовать

sha2(b, 256) != b_hash OR sha2(c, 256) != c_hash OR sha2(d, 256) != d_hash

, который может быть составлен, как показано ниже

val atLeastOneMismatch: Column = map.foldLeft(lit(false)) { 
  case (acc, (c, h)) => acc or (sha2(col(c), 256) =!= h) 
}

Это отрицание

!atLeastOneMismatch

эквивалентно (законы де Моргана еще раз)

sha2(b, 256) = b_hash AND sha2(c, 256) = c_hash AND sha2(d, 256) = d_hash

и далее эквивалентно foldLeft с DataFrame аккумулятором и ===.

Итак, подведем итог - если C является набором столбцов, то:

  • ∈c∈C map (c) = sha2 (c, 256) - atLeastOneMatch
  • ∈c∈C map (c)! = Sha2 (c, 256) - !atLeastOneMatch
  • ∈c∈C map (c)! = Sha2 (c, 256) - atLeastOneMismatch
  • ∈c∈C map (c) = sha2 (c, 256) - !atLeastOneMismatch
...