У меня проблемы с эффективным чтением и анализом большого количества потоковых файлов в Pyspark!
Контекст
Вот схема файла потока, который я читаю в JSON. Пробелы редактируются в целях конфиденциальности.
root
|-- location_info: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- restaurant_type: string (nullable = true)
| | |
| | |
| | |-- other_data: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- other_data_1 string (nullable = true)
| | | | |-- other_data_2: string (nullable = true)
| | | | |-- other_data_3: string (nullable = true)
| | | | |-- other_data_4: string (nullable = true)
| | | | |-- other_data_5: string (nullable = true)
| | |
| | |-- latitude: string (nullable = true)
| | |-- longitude: string (nullable = true)
| | |-- timezone: string (nullable = true)
|-- restaurant_id: string (nullable = true)
Текущий метод чтения и синтаксического анализа (который работает, но занимает СЛИШКОМ много времени)
- Хотя следующий метод работает и сам по себе является решением даже для начала чтения в файлах , этот метод занимает очень много времени, когда количество файлов увеличивается на тысячи
- Размер каждого файла составляет около 10 МБ
- Файлы являются важными «потоковыми» файлами и имеют такие имена, как это
s3://bucket_name/raw/2020/03/05/04/file-stream-6-2020-03-05-04-01-04-123-b978-2e2b-5672-fa243fs4aeb4
. Поэтому я прочитал его как JSON в Pyspark (не уверен, что еще я бы прочитал его?) - Если вы заметили, я призываю заменить restaurant_id на '\ n {"restaurant_id' , это потому, что если я этого не сделаю, тогда операция чтения будет читать только первую запись в файле и игнорирует другое содержимое ...
# Reading multiple files in the dir
source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*")
.values()
.flatMap(lambda x: x
.replace('{"restaurant_id','\n{"restaurant_id' ).split('\n')))
# explode here to have restaurant_id, and nested data
exploded_source_df_1 = source_df_1.select(col('restaurant_id'),
explode(col('location_info')).alias('location_info') )
# Via SQL operation : this will solve the problem for parsing
exploded_source_df_1.createOrReplaceTempView('result_1')
subset_data_1 = spark.sql(
'''
SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timezone
from result_1
'''
)
Вещи, с которыми я хотел бы помочь:
- (1) Есть ли более быстрый способ прочитать это?
- (2) Если Я пытаюсь кэшировать / сохранять фрейм данных, когда я смогу это сделать, поскольку кажется, что
.values().flatMap(lambda x: x.replace('{"restaurant_id','\n{"restaurant_id' )
- это действие само по себе, поэтому, если я вызываю persist () в конце, кажется, что он повторил все чтение?
Вы можете сослаться на эту ветку, чтобы узнать, как я пришел к этому решению в первую очередь: ссылка . Большое спасибо за ваше время