Содержимое обоих объектов является динамическим c, поэтому я не знаю всей структуры заранее
Это похоже на аргумент против Spark SQL (и более для RDD API) ), поскольку оптимизатор делает Spark SQL таким замечательным и должен иметь информацию о схеме для применения оптимизаций к вашему структурированному запросу.
Однако это не всегда может быть правдой в соответствии с требованиями.
, но путь верхнего уровня в каждом из них постоянен
Если за object1
всегда следует element1
, как и object2
, и они используются в join
Этого может быть достаточно.
Вам просто нужно «деструктировать» непрозрачный формат JSON и сделать его похожим на строку. Используйте из_ json стандартных функций, например,
из_ json (e: столбец, схема: столбец): столбец
from_ json (e: Column, схема: DataType): Column
Анализирует столбец, содержащий строку JSON, в MapType со StringType в качестве типа ключей, StructType или ArrayType из StructTypes с указанной схемой. Возвращает ноль, в случае непарсируемой строки.
Эта часть схемы, являющаяся динамической c, может обрабатываться этим аргументом schema
, который может измениться во время выполнения. Он может быть создан как часть вашего приложения Spark (когда входной аргумент schema: DataType
) или может входить в состав DataFrame с вашими входными данными (когда schema: Column
).
Вы также можете использовать get_json_object
или json_tuple
стандартные функции.
Начиная с Spark 2.4.0, вы также можете использовать schema_of_json
для вывода схемы столбца в формате DDL для более ранних функций.
Lots опций, нет?
(псевдо) Код
Предположим, что следующие два набора данных с JSON -кодированными строками.
val object1 = Seq("""
{
"object1": {
"element1": "element1value",
"object1_only": 1
}
}
""").toDF("json")
val object2 = Seq("""
{
"object2": {
"element1": "element1value",
"object2_only": 2
}
}
""").toDF("json")
Вы можете извлечь поле для присоединения, используя стандартную функцию get_json_object
.
val obj1_el1 = object1
.select(get_json_object($"json", "$.object1.element1"))
scala> obj1_el1.show
+-----------------------------------------+
|get_json_object(json, $.object1.element1)|
+-----------------------------------------+
| element1value|
+-----------------------------------------+
Набор объединенных данных будет выглядеть следующим образом:
val o1 = object1
.withColumn("join_column", get_json_object($"json", "$.object1.element1"))
val o2 = object2
.withColumn("join_column", get_json_object($"json", "$.object2.element1"))
val s = o1.join(o2, Seq("join_column"))