Если у вас есть только эти данные в корзине S3, вы можете прочитать их напрямую:
data = spark.read.json('s3:/.../Json_gzips/')
если мы хотим прочитать только некоторые данные, и у нас есть список файлов / URL-адресов, тогда мы может делать следующее:
from functools import reduce
lst=['s3:/....', 's3:/...']
dfs = [spark.read.json(file) for file in lst]
udfs = reduce(lambda df1, df2: df1.union(df2), dfs)
Как это работает:
- импорт первой строки
reduce
функция, упрощающая код - вторая строка определяет список ресурсов
- третий - генерирует фрейм данных для каждого элемента в списке
- последний - выполняет попарно
union
операцию на всех фреймах данных
Здесь есть предостережения, которые необходимо учтено:
- этот код c полагается на автоматическое определение схемы, выполненное
spark.read.json
, и это может привести к наличию фреймов данных с другой схемой, если некоторые данные отсутствуют в некоторых files - схема для всех фреймов данных, участвующих в объединении, должна быть одинаковой, иначе вы получите ошибку
Но мы можем решить эту проблему, явно указав схему при чтении данных. Я часто использую следующий «прием» - помещаю один или два образца в отдельный файл и читаю его с помощью Spark, и использую предполагаемую схему для принудительного выполнения схемы во время чтения - это также будет быстрее, поскольку схема уже предоставлена. Например, как это можно сделать:
>>> lst = ["file1", "file2", "file3"]
# we're reading the data without schema
>>> dfs = [spark.read.json(file) for file in lst]
# we get different lengths of the schemas
>>> map(lambda df: len(df.schema), dfs)
[28, 28, 27]
# we're getting error when trying to do union
>>> udfs = reduce(lambda df1, df2: df1.union(df2), dfs)
мы читаем с применением схемы:
>>> sample_schema = spark.read.json("sample_file.json").schema
>>> dfs = [spark.read.schema(sample_schema).json(file) for file in lst]
# all lengths are the same
>>> map(lambda df: len(df.schema), dfs)
[28, 28, 28]
# and union works
>>> udfs = reduce(lambda df1, df2: df1.union(df2), dfs)