Как сгладить массив во вложенном JSON в AWS клей с помощью Pyspark? - PullRequest
0 голосов
/ 04 октября 2019

Я пытаюсь сгладить файл JSON, чтобы иметь возможность загрузить его в PostgreSQL, все в AWS Glue. Я использую PySpark. Используя сканер, я сканирую S3 JSON и создаю таблицу. Затем я использую сценарий ETL Glue для:

  • чтения просканированной таблицы
  • и использования функции 'Relationalize', чтобы сгладить файл
  • для преобразования динамического фрейма вdataframe
  • попытаться «взорвать» поле request.data

Сценарий на данный момент:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")

df0 = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")

df1 = df0.select(dfc_root_table_name)

df2 = df1.toDF()

df2 = df1.select(explode(col('`request.data`')).alias("request_data"))

<then i write df1 to a PostgreSQL database which works fine>

Проблемы, с которыми я сталкиваюсь:

Функция 'Relationalize' работает хорошо, за исключением поля request.data, которое становится bigint и, следовательно, 'explode' не работает.

Разнесение невозможно выполнить без использования «Relationalize» в JSON, что связано со структурой данных. В частности, ошибка: «org.apache.spark.sql.AnalysisException: не удается разрешить« взорвать (request.data) »из-за несоответствия типов данных: ввод для разнесения функции должен быть массив или тип карты, а не bigint»

Если я сначала пытаюсь сделать динамический фрейм фреймом данных, то у меня возникает такая проблема: "py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o72.jdbc.: Java.lang.IllegalArgumentException: Не удается получить тип JDBC для структуры... "

Я также попытался загрузить классификатор, чтобы данные сгладились в самом обходе, но AWS подтвердил, что это не сработает.

Формат исходного файла JSON:следующим образом, что я пытаюсь нормализовать:

- field1
- field2
- {}
  - field3
  - {}
    - field4
    - field5
  - []
    - {}
      - field6
      - {}
        - field7
        - field8
        - {}
          - field9
          - {}
            - field10

Ответы [ 2 ]

0 голосов
/ 04 октября 2019
# Flatten nested df  
def flatten_df(nested_df): 
    for col in nested_df.columns:


    array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
    for col in array_cols:
        nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))

    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    if len(nested_cols) == 0:
        return nested_df

    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']

    flat_df = nested_df.select(flat_cols +
                            [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])

    return flatten_df(flat_df)

df=flatten_df(df)

Он заменит все точки подчеркиванием. Обратите внимание, что для включения значения Null используется explode_outer, а не explode, если сам массив равен нулю. Эта функция доступна только в spark v2.4+.

Также не забывайте, что взрывающийся массив добавит больше дубликатов, и общий размер строки увеличится. Уплощение структуры увеличит размер столбца. Короче говоря, ваш оригинальный df будет взрываться по горизонтали и вертикали. Это может замедлить обработку данных позже.

Поэтому я рекомендую идентифицировать данные, относящиеся к функциям, и сохранять только те данные в postgresql и исходные файлы json в s3.

0 голосов
/ 04 октября 2019

Как только вы рационализируете столбец json, вам не нужно его взрывать. Relationalize преобразует вложенный JSON в пары ключ-значение на самом внешнем уровне документа JSON. Преобразованные данные содержат список исходных ключей из вложенного JSON, разделенных точками.

Пример:

Вложенный json:

{
    "player": {
        "username": "user1",
        "characteristics": {
            "race": "Human",
            "class": "Warlock",
            "subclass": "Dawnblade",
            "power": 300,
            "playercountry": "USA"
        },
        "arsenal": {
            "kinetic": {
                "name": "Sweet Business",
                "type": "Auto Rifle",
                "power": 300,
                "element": "Kinetic"
            },
            "energy": {
                "name": "MIDA Mini-Tool",
                "type": "Submachine Gun",
                "power": 300,
                "element": "Solar"
            },
            "power": {
                "name": "Play of the Game",
                "type": "Grenade Launcher",
                "power": 300,
                "element": "Arc"
            }
        },
        "armor": {
            "head": "Eye of Another World",
            "arms": "Philomath Gloves",
            "chest": "Philomath Robes",
            "leg": "Philomath Boots",
            "classitem": "Philomath Bond"
        },
        "location": {
            "map": "Titan",
            "waypoint": "The Rig"
        }
    }
}

Сглаженный json после рационализации:

{
    "player.username": "user1",
    "player.characteristics.race": "Human",
    "player.characteristics.class": "Warlock",
    "player.characteristics.subclass": "Dawnblade",
    "player.characteristics.power": 300,
    "player.characteristics.playercountry": "USA",
    "player.arsenal.kinetic.name": "Sweet Business",
    "player.arsenal.kinetic.type": "Auto Rifle",
    "player.arsenal.kinetic.power": 300,
    "player.arsenal.kinetic.element": "Kinetic",
    "player.arsenal.energy.name": "MIDA Mini-Tool",
    "player.arsenal.energy.type": "Submachine Gun",
    "player.arsenal.energy.power": 300,
    "player.arsenal.energy.element": "Solar",
    "player.arsenal.power.name": "Play of the Game",
    "player.arsenal.power.type": "Grenade Launcher",
    "player.arsenal.power.power": 300,
    "player.arsenal.power.element": "Arc",
    "player.armor.head": "Eye of Another World",
    "player.armor.arms": "Philomath Gloves",
    "player.armor.chest": "Philomath Robes",
    "player.armor.leg": "Philomath Boots",
    "player.armor.classitem": "Philomath Bond",
    "player.location.map": "Titan",
    "player.location.waypoint": "The Rig"
}

Таким образом, в вашем случае request.data уже является новым столбцом, сглаженным из столбца запроса, и его тип интерпретируется как bigint посредством spark.

Ссылка: Упрощение / запрос вложенного JSON с помощью реляционного преобразования AWS glue

...