Унифицировать схему для нескольких строк по json строк в Spark Dataframe - PullRequest
5 голосов
/ 08 мая 2020

У меня сложная проблема с строками в PySpark DataFrame, который содержит серию json строк.

Проблема заключается в том, что каждая строка может содержать схему, отличную от другой. , поэтому, когда я хочу преобразовать указанные строки в индексируемый тип данных в PySpark, мне нужна «унифицированная» схема.

Например, рассмотрим этот фрейм данных

import pandas as pd
json_1 = '{"a": 10, "b": 100}'
json_2 = '{"a": 20, "c": 2000}'
json_3 = '{"c": 300, "b": "3000", "d": 100.0, "f": {"some_other": {"A": 10}, "maybe_this": 10}}'
df = spark.createDataFrame(pd.DataFrame({'A': [1, 2, 3], 'B': [json_1, json_2, json_3]}))

Обратите внимание, что каждый Строка содержит разные версии строки json. Чтобы бороться с этим, я выполняю следующие преобразования

import json
import pyspark.sql.functions as fcn
from pyspark.sql import Row
from collections import OrderedDict
from pyspark.sql import DataFrame as SparkDataFrame


def convert_to_row(d: dict) -> Row:
    """Convert a dictionary to a SparkRow.

    Parameters
    ----------
    d : dict
        Dictionary to convert.

    Returns
    -------
    Row

    """
    return Row(**OrderedDict(sorted(d.items())))


def get_schema_from_dictionary(the_dict: dict):
    """Create a schema from a dictionary.

    Parameters
    ----------
    the_dict : dict

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    return spark.read.json(sc.parallelize([json.dumps(the_dict)])).schema


def get_universal_schema(df: SparkDataFrame, column: str):
    """Given a dataframe, retrieve the "global" schema for the column.

    NOTE: It does this by merging across all the rows, so this will
          take a long time for larger dataframes.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
    mega_dict = {}
    for value in col_values:
        mega_dict = {**mega_dict, **value}

    return get_schema_from_dictionary(mega_dict)


def get_sample_schema(df, column):
    """Given a dataframe, sample a single value to convert.

    NOTE: This assumes that the dataframe has the same schema
          over all rows.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    return get_universal_schema(df.limit(1), column)


def from_json(df: SparkDataFrame, column: str, manual_schema=None, merge: bool = False) -> SparkDataFrame:
    """Convert json-string column to a subscriptable object.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.
    manual_schema : PysparkSchema, optional
        Schema understood by PySpark, by default None
    merge : bool, optional
        Parse the whole dataframe to extract a global schema, by default False

    Returns
    -------
    SparkDataFrame

    """
    if manual_schema is None or manual_schema == {}:
        if merge:
            schema = get_universal_schema(df, column)
        else:
            schema = get_sample_schema(df, column)
    else:
        schema = manual_schema

    return df.withColumn(column, fcn.from_json(column, schema))

Затем я могу просто сделать следующее, чтобы получить новый фрейм данных с унифицированной схемой

df = from_json(df, column='B', merge=True)
df.printSchema()
root
 |-- A: long (nullable = true)
 |-- B: struct (nullable = true)
 |    |-- a: long (nullable = true)
 |    |-- b: string (nullable = true)
 |    |-- c: long (nullable = true)
 |    |-- d: double (nullable = true)
 |    |-- f: struct (nullable = true)
 |    |    |-- maybe_this: long (nullable = true)
 |    |    |-- some_other: struct (nullable = true)
 |    |    |    |-- A: long (nullable = true)

Теперь мы подошли к к сути вопроса. Поскольку я делаю это здесь col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()], я ограничен объемом памяти на главном узле.

Как я могу выполнить аналогичную процедуру, если вместо этого работа больше распределяется между каждым рабочим, прежде чем Собираю на мастер-узел?

Ответы [ 2 ]

2 голосов
/ 20 мая 2020

Если я правильно понимаю ваш вопрос, поскольку мы можем использовать RDD в качестве параметра path метода spark.read. json () и RDD распространяется и может уменьшить потенциальную проблему OOM с использованием метода collect() для большого набора данных, поэтому вы можете попробовать настроить функцию get_universal_schema на следующее:

def get_universal_schema(df: SparkDataFrame, column: str):
    return spark.read.json(df.select(column).rdd.map(lambda x: x[0])).schema

и сохранить две функции: get_sample_schema() и from_json() как есть.

1 голос
/ 25 мая 2020

Spark DataFrame предназначены для работы с данными, имеющими схему. DataFrame API предоставляет методы, которые полезны для данных с определенной схемой, например groupBy a столбец , или функции агрегирования для работы с столбцами , et c. et c.

Учитывая требования, представленные в вопросе, мне кажется, что во входных данных нет фиксированной схемы, и что вы не получите преимуществ от DataFrame API. Фактически, вместо этого он, скорее всего, добавит больше ограничений.

Я думаю, что лучше считать эти данные «бессистемными» и использовать API более низкого уровня - RDD s. RDD распределены по кластеру по определению. Итак, используя RDD API, вы можете сначала предварительно обработать данные (используя их как текст), а затем преобразовать их в DataFrame.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...