Свести сложную JSON-схему с помощью pyspark - PullRequest
0 голосов
/ 09 июля 2019

Я пытаюсь сгладить сложную структуру JSON, содержащую вложенные массивы, элементы структуры, используя универсальную функцию, которая должна работать для любых файлов JSON с любой схемой.

Ниже приведена часть примера структуры JSON, которую я хочу сгладить

root
 |-- Data: struct (nullable = true)
 |    |-- Record: struct (nullable = true)
 |    |    |-- FName: string (nullable = true)
 |    |    |-- LName: long (nullable = true)
 |    |    |-- Address: struct (nullable = true)
 |    |    |    |-- Applicant: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- Id: long (nullable = true)
 |    |    |    |    |    |-- Type: string (nullable = true)
 |    |    |    |    |    |-- Option: long (nullable = true)
 |    |    |    |-- Location: string (nullable = true)
 |    |    |    |-- Town: long (nullable = true)
 |    |    |-- IsActive: boolean (nullable = true)
 |-- Id: string (nullable = true)

до

root
 |-- Data_Record_FName: string (nullable = true)
 |-- Data_Record_LName: long (nullable = true)
 |-- Data_Record_Address_Applicant_Id: long (nullable = true)
 |-- Data_Record_Address_Applicant_Type: string (nullable = true)
 |-- Data_Record_Address_Applicant_Option: long (nullable = true)
 |-- Data_Record_Address_Location: string (nullable = true)
 |-- Data_Record_Address_Town: long (nullable = true)
 |-- Data_Record_IsActive: boolean (nullable = true)
 |-- Id: string (nullable = true)

Я использую приведенный ниже код, как указано в теме ниже

Как сгладить структуру в кадре данных Spark?

def flatten_df(nested_df, layers):
    flat_cols = []
    nested_cols = []
    flat_df = []

    flat_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'])
    nested_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'])

    flat_df.append(nested_df.select(flat_cols[0] +
                               [col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols[0]
                                for c in nested_df.select(nc+'.*').columns])
                  )
    for i in range(1, layers):
        print (flat_cols[i-1])
        flat_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] != 'struct'])
        nested_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] == 'struct'])

        flat_df.append(flat_df[i-1].select(flat_cols[i] +
                                [col(nc+'.'+c).alias(nc+'_'+c)
                                    for nc in nested_cols[i]
                                    for c in flat_df[i-1].select(nc+'.*').columns])
        )

    return flat_df[-1]

my_flattened_df = flatten_df(jsonDF, 10)
my_flattened_df.printSchema()   

Но это не работает для элементов массива. С кодом выше я получаю вывод, как показано ниже. Не могли бы вы помочь. Как я могу изменить этот кусок кода, чтобы включить массивы тоже.

root
 |-- Data_Record_FName: string (nullable = true)
 |-- Data_Record_LName: long (nullable = true)
 |-- Data_Record_Address_Applicant: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Id: long (nullable = true)
 |    |    |-- Type: string (nullable = true)
 |    |    |-- Option: long (nullable = true)
 |-- Data_Record_Address_Location: string (nullable = true)
 |-- Data_Record_Address_Town: long (nullable = true)
 |-- Data_Record_IsActive: boolean (nullable = true)
 |-- Id: string (nullable = true)

Это не дубликат, поскольку в нем нет сообщения об универсальной функции для выравнивания сложной схемы JSON, которая также включает в себя массивы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...