Как эффективно читать вложенные JSON в PySpark? - PullRequest
0 голосов
/ 17 июня 2020

У меня проблемы с эффективным чтением и анализом большого количества потоковых файлов в Pyspark!

Контекст

Вот схема файла потока, который я читаю в JSON. Пробелы редактируются в целях конфиденциальности.

root
 |-- location_info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant_type: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |-- other_data: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- other_data_1 string (nullable = true)
 |    |    |    |    |-- other_data_2: string (nullable = true)
 |    |    |    |    |-- other_data_3: string (nullable = true)
 |    |    |    |    |-- other_data_4: string (nullable = true)
 |    |    |    |    |-- other_data_5: string (nullable = true)
 |    |    |
 |    |    |-- latitude: string (nullable = true)
 |    |    |-- longitude: string (nullable = true)
 |    |    |-- timezone: string (nullable = true)
 |-- restaurant_id: string (nullable = true)

Текущий метод чтения и синтаксического анализа (который работает, но занимает СЛИШКОМ много времени)

  • Хотя следующий метод работает и сам по себе является решением даже для начала чтения в файлах , этот метод занимает очень много времени, когда количество файлов увеличивается на тысячи
  • Размер каждого файла составляет около 10 МБ
  • Файлы являются важными «потоковыми» файлами и имеют такие имена, как это s3://bucket_name/raw/2020/03/05/04/file-stream-6-2020-03-05-04-01-04-123-b978-2e2b-5672-fa243fs4aeb4. Поэтому я прочитал его как JSON в Pyspark (не уверен, что еще я бы прочитал его?)
    • Если вы заметили, я призываю заменить restaurant_id на '\ n {"restaurant_id' , это потому, что если я этого не сделаю, тогда операция чтения будет читать только первую запись в файле и игнорирует другое содержимое ...
# Reading multiple files in the dir
source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*")                              
          .values()
          .flatMap(lambda x: x
                   .replace('{"restaurant_id','\n{"restaurant_id' ).split('\n')))


# explode here to have restaurant_id, and nested data
exploded_source_df_1 =  source_df_1.select(col('restaurant_id'),
  explode(col('location_info')).alias('location_info') )


# Via SQL operation : this will solve the problem for parsing 
exploded_source_df_1.createOrReplaceTempView('result_1')

subset_data_1 = spark.sql(
'''
SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timezone 
from result_1
'''
)

Вещи, с которыми я хотел бы помочь:

  • (1) Есть ли более быстрый способ прочитать это?
  • (2) Если Я пытаюсь кэшировать / сохранять фрейм данных, когда я смогу это сделать, поскольку кажется, что .values().flatMap(lambda x: x.replace('{"restaurant_id','\n{"restaurant_id' ) - это действие само по себе, поэтому, если я вызываю persist () в конце, кажется, что он повторил все чтение?

Вы можете сослаться на эту ветку, чтобы узнать, как я пришел к этому решению в первую очередь: ссылка . Большое спасибо за ваше время

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...