У меня сложная проблема с строками в PySpark DataFrame, который содержит серию json строк.
Проблема заключается в том, что каждая строка может содержать схему, отличную от другой. , поэтому, когда я хочу преобразовать указанные строки в индексируемый тип данных в PySpark, мне нужна «унифицированная» схема.
Например, рассмотрим этот фрейм данных
import pandas as pd
json_1 = '{"a": 10, "b": 100}'
json_2 = '{"a": 20, "c": 2000}'
json_3 = '{"c": 300, "b": "3000", "d": 100.0, "f": {"some_other": {"A": 10}, "maybe_this": 10}}'
df = spark.createDataFrame(pd.DataFrame({'A': [1, 2, 3], 'B': [json_1, json_2, json_3]}))
Обратите внимание, что каждый Строка содержит разные версии строки json. Чтобы бороться с этим, я выполняю следующие преобразования
import json
import pyspark.sql.functions as fcn
from pyspark.sql import Row
from collections import OrderedDict
from pyspark.sql import DataFrame as SparkDataFrame
def convert_to_row(d: dict) -> Row:
"""Convert a dictionary to a SparkRow.
Parameters
----------
d : dict
Dictionary to convert.
Returns
-------
Row
"""
return Row(**OrderedDict(sorted(d.items())))
def get_schema_from_dictionary(the_dict: dict):
"""Create a schema from a dictionary.
Parameters
----------
the_dict : dict
Returns
-------
schema
Schema understood by PySpark.
"""
return spark.read.json(sc.parallelize([json.dumps(the_dict)])).schema
def get_universal_schema(df: SparkDataFrame, column: str):
"""Given a dataframe, retrieve the "global" schema for the column.
NOTE: It does this by merging across all the rows, so this will
take a long time for larger dataframes.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
Returns
-------
schema
Schema understood by PySpark.
"""
col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
mega_dict = {}
for value in col_values:
mega_dict = {**mega_dict, **value}
return get_schema_from_dictionary(mega_dict)
def get_sample_schema(df, column):
"""Given a dataframe, sample a single value to convert.
NOTE: This assumes that the dataframe has the same schema
over all rows.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
Returns
-------
schema
Schema understood by PySpark.
"""
return get_universal_schema(df.limit(1), column)
def from_json(df: SparkDataFrame, column: str, manual_schema=None, merge: bool = False) -> SparkDataFrame:
"""Convert json-string column to a subscriptable object.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
manual_schema : PysparkSchema, optional
Schema understood by PySpark, by default None
merge : bool, optional
Parse the whole dataframe to extract a global schema, by default False
Returns
-------
SparkDataFrame
"""
if manual_schema is None or manual_schema == {}:
if merge:
schema = get_universal_schema(df, column)
else:
schema = get_sample_schema(df, column)
else:
schema = manual_schema
return df.withColumn(column, fcn.from_json(column, schema))
Затем я могу просто сделать следующее, чтобы получить новый фрейм данных с унифицированной схемой
df = from_json(df, column='B', merge=True)
df.printSchema()
root
|-- A: long (nullable = true)
|-- B: struct (nullable = true)
| |-- a: long (nullable = true)
| |-- b: string (nullable = true)
| |-- c: long (nullable = true)
| |-- d: double (nullable = true)
| |-- f: struct (nullable = true)
| | |-- maybe_this: long (nullable = true)
| | |-- some_other: struct (nullable = true)
| | | |-- A: long (nullable = true)
Теперь мы подошли к к сути вопроса. Поскольку я делаю это здесь col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
, я ограничен объемом памяти на главном узле.
Как я могу выполнить аналогичную процедуру, если вместо этого работа больше распределяется между каждым рабочим, прежде чем Собираю на мастер-узел?