Чтение файлов в диапазонах \ гггг \ мм \ дд \ чч \ мм \ - PullRequest
0 голосов
/ 02 мая 2019

У меня есть приложение PySpark, которому нужно читать файлы из учетной записи хранилища BLOB-объектов Azure, где файлы разбиваются на папки каждые 5 минут в следующем формате:

\Root\yyyy\mm\dd\HH\MM\files.csv

У меня есть процесс, который запускается каждый часи хочет обработать все файлы с момента последнего запуска (который может быть дольше часа, если запуск был пропущен).Я управляю верхним водяным знаком, который сообщает мне время последней обработки папки.

Внутри файла также есть поле даты и времени, которое соответствует пути даты и времени (с более подробной информацией во втором).

Примечаниечто я не могу изменить структуру папок, чтобы Sparks предпочитал метод разделения года = гггг \ месяц = ​​мм и т. д.

Я написал эту функцию:

from datetime import datetime

def folderDateTimeRange(startDateTime, endDateTime, levels=5):
      if startDateTime.year != endDateTime.year:
        return '/{*}' * levels
      elif startDateTime.month != endDateTime.month:
        return datetime.strftime(startDateTime, '%Y')  + '/{*}' * (levels - 1)
      elif startDateTime.day != endDateTime.day:
        return datetime.strftime(startDateTime, '%Y/%m')  + '/{*}' * (levels - 2)
      elif startDateTime.hour != endDateTime.hour:
        return datetime.strftime(startDateTime, '%Y/%m/%d')  + '/{*}' * (levels - 3)
      else:
        return ""

Это ограничивает количество папокчитайте в большинстве случаев.Мне все еще нужно отфильтровать, что данные читаются по тем же временам начала и окончания, которые передаются в функцию, потому что 23:00 - 01:00 следующего дня вернет {*} в части дня и часа - следовательно, я думаю, что это можетбыть более эффективным.

В худшем примере вы передаете start = 2018-12-31 22:00:00 и end = 2019-01-01 01:00:00 - это приводит к получению всех данных за все годыбыть прочитанным.

Мои знания о глобусах ограничены - но возможно ли передать диапазон вместо {*}?

1 Ответ

1 голос
/ 09 мая 2019

Да, вы можете использовать фигурные скобки, чтобы вернуть список элементов, или вы можете использовать регулярное выражение.

Проверьте здесь: Чтение диапазона файлов в pySpark и здесь: pyspark выбирает подмножество файлов с помощью regex / glob из s3 (я не уверен, насколько Azure и S3 различаются, но я предполагаю, что PySpark может абстрагировать это; исправьте меня, если я ошибаюсь.)

Вы также можете свести к минимуму «трату» чтения файлов, создав несколько путей и отправив их вместо одного пути (это гарантирует, что у вас не будет той же ошибки при чтении данных за два года, если вы переходите с одного годак следующему.)

Ради интереса я написал небольшой код с некоторыми тестовыми материалами внизу, вы, вероятно, можете вернуть эти списки и получить то, что вы хотите:

from datetime import datetime as dt
from datetime import timedelta
from collections import defaultdict
# \Root\yyyy\mm\dd\HH\MM\files.csv


def folderDateTimeRange(start, end, levels=5):
    start_iter = start
    paths = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))
    while start_iter < end:
        paths[start_iter.year][start_iter.month][start_iter.day][start_iter.hour].append(start_iter.minute)
        start_iter += timedelta(minutes=5)

    ret_paths = []
    for year, v1 in paths.items():
        path = '{}\\'.format(year)
        for month, v2 in v1.items():
            path += '{}\\'.format(month)
            for day, v3 in v2.items():
                path += '{}\\'.format(day)
                path += '{{{}}}\\{{*}}'.format(','.join([str(_) for _ in v3.keys()]))
        ret_paths.append(path)

    return ret_paths


def test(a, b):
    res = folderDateTimeRange(a, b)
    for r in res:
        print(r)
    print('---')


test(dt(2018, 1, 1), dt(2018, 1, 2))
test(dt(2018, 12, 31), dt(2019, 1, 2))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...