Как прочитать Json Gzip файлов из S3 в список? - PullRequest
1 голос
/ 13 июля 2020

У меня есть несколько файлов в S3.

s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket='cw-milenko-tests', Prefix='Json_gzips'):
    contents = page["Contents"]
    for c in contents:
        if (c['Key']).startswith('Json_gzips/tick_calculated_3'):
            print(c['Key'])

Вывод

Json_gzips/tick_calculated_3_2020-05-27T00-05-51.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-13-23.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-17-36.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-28-10.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-30-43.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-34-56.json.gz
Json_gzips/tick_calculated_3_2020-05-27T00-38-29.json.gz

Я хочу, чтобы все эти файлы были помещены во фрейм данных Spark, затем выполнить объединение и сохранить как единый паркетный файл. Я уже просил определить схему для PySpark и получил полезный ответ. Как мне отредактировать свой код?

data = spark.read.json(c['Key'])

данные должны быть сохранены в big_data = [data1, data2, ...]

Это позволит мне объединиться.

bigdf = reduce(DataFrame.unionAll, big_data)

Как исправить?

1 Ответ

1 голос
/ 15 июля 2020

Если у вас есть только эти данные в корзине S3, вы можете прочитать их напрямую:

data = spark.read.json('s3:/.../Json_gzips/')

если мы хотим прочитать только некоторые данные, и у нас есть список файлов / URL-адресов, тогда мы может делать следующее:

from functools import reduce
lst=['s3:/....', 's3:/...']
dfs = [spark.read.json(file) for file in lst]
udfs = reduce(lambda df1, df2: df1.union(df2), dfs)

Как это работает:

  • импорт первой строки reduce функция, упрощающая код
  • вторая строка определяет список ресурсов
  • третий - генерирует фрейм данных для каждого элемента в списке
  • последний - выполняет попарно union операцию на всех фреймах данных

Здесь есть предостережения, которые необходимо учтено:

  • этот код c полагается на автоматическое определение схемы, выполненное spark.read.json, и это может привести к наличию фреймов данных с другой схемой, если некоторые данные отсутствуют в некоторых files
  • схема для всех фреймов данных, участвующих в объединении, должна быть одинаковой, иначе вы получите ошибку

Но мы можем решить эту проблему, явно указав схему при чтении данных. Я часто использую следующий «прием» - помещаю один или два образца в отдельный файл и читаю его с помощью Spark, и использую предполагаемую схему для принудительного выполнения схемы во время чтения - это также будет быстрее, поскольку схема уже предоставлена. Например, как это можно сделать:

>>> lst = ["file1", "file2", "file3"]
# we're reading the data without schema
>>> dfs = [spark.read.json(file) for file in lst]
# we get different lengths of the schemas
>>> map(lambda df: len(df.schema), dfs)
[28, 28, 27]
# we're getting error when trying to do union
>>> udfs = reduce(lambda df1, df2: df1.union(df2), dfs)

мы читаем с применением схемы:

>>> sample_schema = spark.read.json("sample_file.json").schema
>>> dfs = [spark.read.schema(sample_schema).json(file) for file in lst]
# all lengths are the same
>>> map(lambda df: len(df.schema), dfs)
[28, 28, 28]
# and union works
>>> udfs = reduce(lambda df1, df2: df1.union(df2), dfs)
...