Условная загрузка разделов из файловой системы - PullRequest
0 голосов
/ 03 августа 2020

Мне известно, что были вопросы относительно подстановочных знаков в .load() -функции pySparks, например здесь или здесь . Во всяком случае, ни один из вопросов / ответов, которые я нашел, не касался моего варианта этого.

Context

В pySpark я хочу загружать файлы непосредственно из HDFS, потому что мне нужно использовать avro-библиотеку databricks для Spark 2.3.x. Я делаю это так:

partition_stamp = "202104"

df = spark.read.format("com.databricks.spark.avro") \
        .load(f"/path/partition={partition_stamp}*") \
        .select("...")

Как вы можете видеть, разделы создаются из меток времени в формате yyyyMMdd.

Вопрос

В настоящее время я только получить все использованные перегородки на апрель 2021 года (partition_stamp = "202104"). Однако мне нужны все разделы, начиная с апреля 2021 года.

Написано в псевдокоде, мне понадобится решение, подобное этому:

.load(f"/path/partition >= {partition_stamp}*")

Поскольку на самом деле существует несколько сотен разделов, это бесполезно делать это каким-либо образом, требующим жесткого кодирования.


Итак, мой вопрос: Есть ли функция для условной загрузки файлов?

1 Ответ

0 голосов
/ 03 августа 2020

Как я узнал, существуют только следующие параметры для динамической обработки путей внутри функции .load():

*:  Wildcard for any character or sequence of characters until the end of the line or a new sub-directory ('/') -> (/path/20200*)
[1-3]: Regex-like inclusion of a defined character-range -> (/path/20200[1-3]/...)
{1,2,3}: Set-like inclusion of a defined set of characters -> (/path/20200{1,2,3}/...)

Таким образом, чтобы ответить на мой вопрос: Нет встроенной функции для условной загрузки файла.



В любом случае, я хочу предоставить вам свое решение:

import pandas as pd # Utilize pandas date-functions

partition_stamp = ",".join((set(
                        str(_range.year) + "{:02}".format(_range.month) 
                        for _range in pd.date_range(start=start_date, end=end_date, freq='D')
                 )))

df = spark.read.format("com.databricks.spark.avro") \
        .load(f"/path/partition={{{partition_stamp}}}*") \
        .select("...")

Таким образом, ограничение для метки времени в формате yyyyMM генерируется динамически для заданной даты начала и окончания и .load() на основе строк все еще можно использовать.

...