Spark Как получить количество ключей, измененных в двух JSONS в Scala? - PullRequest
10 голосов
/ 15 мая 2019

У меня есть два кадра данных, для которых я пытаюсь найти разницу. 2 кадра данных содержат массивы структуры. Мне не требуется 1 ключ в этой структуре. Поэтому я сначала удалил его, а затем преобразовал в строку JSON. При сравнении мне нужно знать, сколько элементов изменилось в этом массиве (Json). Есть ли способ сделать это в искре?

И base_data_set, и target_data_set содержат ID и KEY. KEY является array<Struct>:

root
 |-- id: string (nullable = true)
 |-- result: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: integer (nullable = true)
 |    |    |-- key3: string (nullable = false)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key4: string (nullable = true)

val temp_base = base_data_set
    .withColumn("base_result", explode(base_data_set(RESULT)))
    .withColumn("base",
        struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
    .groupBy(ID)
    .agg(to_json(collect_list("base")).as("base_picks"))

val temp_target = target_data_set
    .withColumn("target_result", explode(target_data_set(RESULT)))
    .withColumn("target",
        struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
    .groupBy(ID)
    .agg(to_json(collect_list("target")).as("target_picks"))    


val common_keys = temp_base
    .join(temp_target, temp_base(ID) ===  temp_target(ID))
    .drop(temp_target(ID))
    .withColumn("isModified", $"base_picks" =!= $"target_picks") 

Возвращает ложь даже при 1 изменении элемента, но мне нужно возвращать ложь только тогда, когда изменилось более n (скажем, n = 3) элементов (в массиве). Кто-нибудь, пожалуйста, посоветуйте мне, как мне этого достичь?

1 Ответ

5 голосов
/ 20 мая 2019

Я не совсем уверен, если вы это имеете в виду, потому что некоторые части вашего вопроса нелегко понять (по крайней мере, для меня).

Я использовал два файла json для имитации вашей схемы. Они выглядят так:

base_data_set:
{ "id": 1,  "result": [ {"key1":  23, "key2": "qwerty", "key3":  "abc"}, {"key1":  24, "key2": "asdf", "key3":  "abc"},  {"key1":  25, "key2": "xcv", "key3":  "abc"}]}
{ "id": 2,  "result": [ {"key1":  23, "key2": "qwerty", "key3":  "abc"}, {"key1":  24, "key2": "asdf", "key3":  "abc"},  {"key1":  25, "key2": "xcv", "key3":  "abc"}]}
{ "id": 3,  "result": [ {"key1":  "1", "key2": "2", "key3":  "3"}, {"key1":  "4", "key2": "5", "key3":  "6"},  {"key1":  "7", "key2": "8", "key3":  "9"}]}
{ "id": 4,  "result": [ {"key1":  "4", "key2": "5", "key3":  "6"}, {"key1":  "1", "key2": "2", "key3":  "3"},   {"key1":  "7", "key2": "8", "key3":  "9"}]}

target_data_set:
{ "id": 1,  "result": [ {"key1":  24, "key2": "qwerty", "key3":  "abc"}, {"key1":  24, "key2": "asdf", "key3":  "abc"},  {"key1":  25, "key2": "xcv", "key3":  "abc"}]}
{ "id": 2,  "result": [ {"key1":  23, "key2": "qwertu", "key3":  "abc"}, {"key1":  24, "key2": "asdfg", "key3":  "abc"},  {"key1":  25, "key2": "xcvv", "key3":  "abc"}]}
{ "id": 3,  "result": [ {"key1":  "1", "key2": "2", "key3":  "3"}, {"key1":  "4", "key2": "5", "key3":  "6"},  {"key1":  "7", "key2": "8", "key3":  "9"}]}
{ "id": 4,  "result": [ {"key1":  "1", "key2": "2", "key3":  "3"}, {"key1":  "4", "key2": "5", "key3":  "6"},  {"key1":  "7", "key2": "8", "key3":  "9"}]}

Как видите, первая строка отличается только в одной из структур и массиве результатов, тогда как во второй строке все структуры различаются. Строки 3 и 4 показывают случай, когда мне не ясно, считаете ли вы это изменением. Структуры одинаковы для обеих таблиц, однако их порядок изменений в строке 4.

Начиная с ваших начальных преобразований, я удалил функцию to_json, потому что она преобразовывает структурированные элементы в строку, что затрудняет сравнение:

val temp_base = base_data_set
  .withColumn("base_result", explode(base_data_set("result")))
  .withColumn("base",
    struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
  .groupBy("id")
  .agg(collect_list("base").as("base_picks"))


val temp_target = target_data_set
  .withColumn("target_result", explode(target_data_set(RESULT)))
  .withColumn("target",
    struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
  .groupBy(ID)
  .agg(collect_list("target").as("target_picks"))


val common_keys = temp_base
  .join(temp_target, temp_base(ID) ===  temp_target(ID))
  .drop(temp_target(ID))
  .withColumn("isModified", $"base_picks" =!= $"target_picks")

В дальнейшем вы можете использовать пользовательскую функцию для сравнения результатов collect_list. Он берет содержимое двух столбцов и подсчитывает, сколько элементов различаются:

  val numChangedStruct = udf {
  (left: mutable.WrappedArray[Object], right: mutable.WrappedArray[Object]) =>
    left.zip(right).count(x => !x._1.equals(x._2))
}

И применено:

common_keys.withColumn("numChangedStruct", numChangedStruct($"base_picks", $"target_picks")).show(20, false)

+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|id |base_picks                                    |target_picks                                    |isModified|numChangedStruct|
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|1  |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[24,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]  |true      |1               |
|3  |[[1,2,3], [4,5,6], [7,8,9]]                   |[[1,2,3], [4,5,6], [7,8,9]]                     |false     |0               |
|2  |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[23,qwertu,abc], [24,asdfg,abc], [25,xcvv,abc]]|true      |3               |
|4  |[[4,5,6], [1,2,3], [7,8,9]]                   |[[1,2,3], [4,5,6], [7,8,9]]                     |true      |2               |
+---+----------------------------------------------+------------------------------------------------+----------+----------------+

Однако это решение зависит от порядка элементов в «результате», как вы можете видеть из строк с id 3 и 4.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...