Как объединить кадры данных с JSON -кодированными строками по заданному c пути? - PullRequest
1 голос
/ 20 января 2020

Я хочу объединить 2 фрейма данных (оба содержат большие тома JSON), но по указанному пути c. Я надеялся, что смогу сделать это в рамках одной и той же операции, и мне не нужно преобразовывать его как RDD после JOIN. Я предполагаю, что это возможно сделать элегантно через Spark SQL.

Содержимое обоих объектов Dynami c, поэтому я не знаю всей структуры заранее, но путь верхнего уровня в каждом из них является постоянным

объект 1 документ

{
   "object1": {
        "element1" "element1value"
        ...
   }

}

объект 2 документ

{

   "object2": {

    "element1" "element1value"
    ...
   }

}

Ожидаемый результат

{
   "object1Parent": {
        "element1" "element1value"
        ...
   },
   "object2Parent": {

       "object2": {

        "element1" "element1value"
        ...
       }
   }

}

Операция соединения

SQL: "SELECT * FROM object1 r JOIN object2 s ON r.element1 = s.element2"

1 Ответ

1 голос
/ 22 января 2020

Содержимое обоих объектов является динамическим c, поэтому я не знаю всей структуры заранее

Это похоже на аргумент против Spark SQL (и более для RDD API) ), поскольку оптимизатор делает Spark SQL таким замечательным и должен иметь информацию о схеме для применения оптимизаций к вашему структурированному запросу.

Однако это не всегда может быть правдой в соответствии с требованиями.

, но путь верхнего уровня в каждом из них постоянен

Если за object1 всегда следует element1, как и object2, и они используются в join Этого может быть достаточно.

Вам просто нужно «деструктировать» непрозрачный формат JSON и сделать его похожим на строку. Используйте из_ json стандартных функций, например,

из_ json (e: столбец, схема: столбец): столбец

from_ json (e: Column, схема: DataType): Column

Анализирует столбец, содержащий строку JSON, в MapType со StringType в качестве типа ключей, StructType или ArrayType из StructTypes с указанной схемой. Возвращает ноль, в случае непарсируемой строки.

Эта часть схемы, являющаяся динамической c, может обрабатываться этим аргументом schema, который может измениться во время выполнения. Он может быть создан как часть вашего приложения Spark (когда входной аргумент schema: DataType) или может входить в состав DataFrame с вашими входными данными (когда schema: Column).

Вы также можете использовать get_json_object или json_tuple стандартные функции.

Начиная с Spark 2.4.0, вы также можете использовать schema_of_json для вывода схемы столбца в формате DDL для более ранних функций.

Lots опций, нет?

(псевдо) Код

Предположим, что следующие два набора данных с JSON -кодированными строками.

val object1 = Seq("""
{
  "object1": {
    "element1": "element1value",
    "object1_only": 1
  }
}
""").toDF("json")

val object2 = Seq("""
{
  "object2": {
    "element1": "element1value",
    "object2_only": 2
  }
}
""").toDF("json")

Вы можете извлечь поле для присоединения, используя стандартную функцию get_json_object.

val obj1_el1 = object1
  .select(get_json_object($"json", "$.object1.element1"))
scala> obj1_el1.show
+-----------------------------------------+
|get_json_object(json, $.object1.element1)|
+-----------------------------------------+
|                            element1value|
+-----------------------------------------+

Набор объединенных данных будет выглядеть следующим образом:

val o1 = object1
  .withColumn("join_column", get_json_object($"json", "$.object1.element1"))
val o2 = object2
  .withColumn("join_column", get_json_object($"json", "$.object2.element1"))
val s = o1.join(o2, Seq("join_column"))
...