Разбор вложенных JSON в фрейм данных Spark с использованием PySpark - PullRequest
0 голосов
/ 20 марта 2020

Мне бы очень понравилась помощь в разборе вложенных JSON данных с использованием PySpark- SQL. Данные имеют следующую схему (пробелы редактируются в целях конфиденциальности ...)

Схема

root
 |-- location_info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant_type: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |-- other_data: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- other_data_1 string (nullable = true)
 |    |    |    |    |-- other_data_2: string (nullable = true)
 |    |    |    |    |-- other_data_3: string (nullable = true)
 |    |    |    |    |-- other_data_4: string (nullable = true)
 |    |    |    |    |-- other_data_5: string (nullable = true)
 |    |    |
 |    |    |-- latitude: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |
 |    |    |
 |    |    |
 |    |    |-- longitude: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |
 |    |    |-- timezone: string (nullable = true)
 |-- restaurant_id: string (nullable = true)

Моя цель Я бы по сути, хочу получить данные в следующем фрейме данных

restaurant_id | latitude | longtitude | timezone 

Я пробовал

dfj = spark.read.option("multiLine", False).json("/file/path")

result = dfj.select(col('restaurant_id'),
  explode(col('location_info')).alias('location_info') )

# SQL operation
result.createOrReplaceTempView('result')

subset_data = spark.sql(
'''
SELECT restaurant_id, location_info.latitude,location_info.longitude,location_info.timestamp  
FROM result

'''
).show()  

# Also tried this to read in
source_df_1 = spark.read.json(sc.wholeTextFiles("/file/path")
          .values()
          .flatMap(lambda x: x
                   .replace("{", "#!#")
                   .split("#!#")))

Но, как ни странно, это дает мне следующее только для первого объекта или идентификатора ресторана

+-------+-----------+------------+--------------------+
|restaurant_id|latitude|longitude|timestamp|
+-------+-----------+------------+--------------------+
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
+-------+-----------+------------+--------------------+

Мое исследование показало, что это может быть связано с тем, как файлы JSON структурированы в источнике. Например:

{}{
}{
}

Таким образом, не многолинейный или что-то. Хотите знать, что с этим делать?

Большое спасибо за чтение, любая помощь будет принята с благодарностью. Я знаю, что всегда могу рассчитывать на ТА, чтобы помочь

1 Ответ

1 голос
/ 04 апреля 2020

Мне удалось решить эту проблему, прочитав описанный выше файл JSON, надеюсь, это поможет! :

# Reading multiple files in the dir
source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*")                              
          .values()
          .flatMap(lambda x: x
                   .replace('{"restaurant_id','\n{"restaurant_id' ).split('\n')))


# explode here to have restaurant_id, and nested data
exploded_source_df_1 =  source_df_1.select(col('restaurant_id'),
  explode(col('location_info')).alias('location_info') )


# Via SQL operation : this will solve the problem for parsing 
exploded_source_df_1.createOrReplaceTempView('result_1')

subset_data_1 = spark.sql(
'''
SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timestamp 
from result_1
'''
).persist()
...