Pyspark считывает выбранные файлы дат из хранилища иерархии дат - PullRequest
0 голосов
/ 12 февраля 2019

Я пытаюсь прочитать несколько CSV-файлов с помощью Pyspark, данные обрабатываются Amazon Kinesis Firehose, поэтому они записываются в следующем формате.

s3bucket/ 
    YYYY/
        mm/
            dd/
                hh/
                    files.gz
                    files.gz
                    files.gz

Я на самом деле использую этот код для чтения дляполный день (например, 15/01/2019) с регулярным выражением:

data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) \
    .load("s3://s3bucket/2019/01/15/*.gz".format(datetime_object.strftime("%Y/%m/%d")))

Мой вопрос: как я могу прочитать данные за несколько дней, зная нужные мне даты?Есть ли автоматический способ или я должен сделать регулярное выражение для нужных мне дат?

РЕДАКТИРОВАТЬ:
Что я ищу, так это обратная функция метода DataFrameWriter.partitionBy (* cols) в документацииниже
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=regex#pyspark.sql.DataFrameWriter

Ответы [ 2 ]

0 голосов
/ 14 февраля 2019

Я не нашел для него функции, однако это обходной путь:

datetime_object = datetime.strptime("2019-01-31", '%Y-%m-%d')
delta_days = 10
base_bucket = "s3://s3bucket/{}/*/*.gz"
bucket_names = []
for date in [datetime_object - timedelta(days=x) for x in range(0, delta_days)]:
    bucket_names.append(base_bucket.format(date.strftime("%Y/%m/%d")))

К счастью, функция .load() принимает список в качестве аргумента исходных путей, поэтому я генерирую каждый путьосновываясь на нужных мне датах и ​​передав их в функцию загрузки.

data = spark.read.format("csv").options(compression="GZIP") \
        .load(bucket_names)
0 голосов
/ 12 февраля 2019

Я волнуюсь, нет никакого способа сделать это.

Если ваши данные структурированы, как показано ниже (с месяцем =, год = ...), мы называем это разделом.

s3bucket/ 
    year=YYYY/
        month=mm/
            day=dd/
                hour=hh/
                    files.gz
                    files.gz
                    files.gz

И вы можете легко загрузить свои данные (в вашем случае по определенным дням)

data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) \
  .load("s3://s3bucket/")

data_days = data.filter("day in (10, 20)")

С разделом Spark загружает только ваши конкретные дни, а не все дни.

...