Я не совсем уверен, если вы это имеете в виду, потому что некоторые части вашего вопроса нелегко понять (по крайней мере, для меня).
Я использовал два файла json для имитации вашей схемы. Они выглядят так:
base_data_set:
{ "id": 1, "result": [ {"key1": 23, "key2": "qwerty", "key3": "abc"}, {"key1": 24, "key2": "asdf", "key3": "abc"}, {"key1": 25, "key2": "xcv", "key3": "abc"}]}
{ "id": 2, "result": [ {"key1": 23, "key2": "qwerty", "key3": "abc"}, {"key1": 24, "key2": "asdf", "key3": "abc"}, {"key1": 25, "key2": "xcv", "key3": "abc"}]}
{ "id": 3, "result": [ {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "7", "key2": "8", "key3": "9"}]}
{ "id": 4, "result": [ {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "7", "key2": "8", "key3": "9"}]}
target_data_set:
{ "id": 1, "result": [ {"key1": 24, "key2": "qwerty", "key3": "abc"}, {"key1": 24, "key2": "asdf", "key3": "abc"}, {"key1": 25, "key2": "xcv", "key3": "abc"}]}
{ "id": 2, "result": [ {"key1": 23, "key2": "qwertu", "key3": "abc"}, {"key1": 24, "key2": "asdfg", "key3": "abc"}, {"key1": 25, "key2": "xcvv", "key3": "abc"}]}
{ "id": 3, "result": [ {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "7", "key2": "8", "key3": "9"}]}
{ "id": 4, "result": [ {"key1": "1", "key2": "2", "key3": "3"}, {"key1": "4", "key2": "5", "key3": "6"}, {"key1": "7", "key2": "8", "key3": "9"}]}
Как видите, первая строка отличается только в одной из структур и массиве результатов, тогда как во второй строке все структуры различаются. Строки 3 и 4 показывают случай, когда мне не ясно, считаете ли вы это изменением. Структуры одинаковы для обеих таблиц, однако их порядок изменений в строке 4.
Начиная с ваших начальных преобразований, я удалил функцию to_json, потому что она преобразовывает структурированные элементы в строку, что затрудняет сравнение:
val temp_base = base_data_set
.withColumn("base_result", explode(base_data_set("result")))
.withColumn("base",
struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
.groupBy("id")
.agg(collect_list("base").as("base_picks"))
val temp_target = target_data_set
.withColumn("target_result", explode(target_data_set(RESULT)))
.withColumn("target",
struct($"target_result.key1", $"target_result.key2", $"target_result.key3"))
.groupBy(ID)
.agg(collect_list("target").as("target_picks"))
val common_keys = temp_base
.join(temp_target, temp_base(ID) === temp_target(ID))
.drop(temp_target(ID))
.withColumn("isModified", $"base_picks" =!= $"target_picks")
В дальнейшем вы можете использовать пользовательскую функцию для сравнения результатов collect_list
. Он берет содержимое двух столбцов и подсчитывает, сколько элементов различаются:
val numChangedStruct = udf {
(left: mutable.WrappedArray[Object], right: mutable.WrappedArray[Object]) =>
left.zip(right).count(x => !x._1.equals(x._2))
}
И применено:
common_keys.withColumn("numChangedStruct", numChangedStruct($"base_picks", $"target_picks")).show(20, false)
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|id |base_picks |target_picks |isModified|numChangedStruct|
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
|1 |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[24,qwerty,abc], [24,asdf,abc], [25,xcv,abc]] |true |1 |
|3 |[[1,2,3], [4,5,6], [7,8,9]] |[[1,2,3], [4,5,6], [7,8,9]] |false |0 |
|2 |[[23,qwerty,abc], [24,asdf,abc], [25,xcv,abc]]|[[23,qwertu,abc], [24,asdfg,abc], [25,xcvv,abc]]|true |3 |
|4 |[[4,5,6], [1,2,3], [7,8,9]] |[[1,2,3], [4,5,6], [7,8,9]] |true |2 |
+---+----------------------------------------------+------------------------------------------------+----------+----------------+
Однако это решение зависит от порядка элементов в «результате», как вы можете видеть из строк с id 3 и 4.