Найти последнюю версию каждой строки, используя Dask с файлами Parquet и partition_on? - PullRequest
0 голосов
/ 18 марта 2020

Как я могу убедиться, что могу сохранить самую последнюю версию строки (на основе уникальных ограничений) с помощью Dask с использованием файлов Parquet и partition_on?

Основным вариантом использования c является что я хочу запросить базу данных для всех строк where updated_at > yesterday и разбить данные на основе created_at_date (это означает, что может быть несколько дат, которые были обновлены, и эти файлы, скорее всего, уже существуют).

└───year=2019
    └───month=2019-01
            2019-01-01.parquet
            2019-01-02.parquet

Поэтому я хочу иметь возможность комбинировать мои новые результаты из последнего запроса и старые результаты на диске, а затем сохранять последнюю версию каждой строки (id столбец).





В настоящее время у меня есть операторы Airflow, обрабатывающие следующие логи c с Pandas, и это достигает моей цели. Я надеялся выполнить то же самое с Dask sh без особого кода: столбец месяца для создания файлов 2019-01-01.parquet или 2019-12.parquet

Пример:

df_dict = {k: v for k, v in df.groupby(partition_columns)}
L oop через каждый раздел и проверьте, существует ли имя файла. Если файл с таким именем уже существует, считайте этот файл как отдельный фрейм данных и объедините два фрейма данных

Пример:

part = df_dict[partition]
part= pd.concat([part, existing], sort=False, ignore_index=True, axis='index')
Сортировка кадров данных и удаление дубликатов на основе списка указанных столбцов (уникальные ограничения, отсортированные по столбцам file_modified_timestamp или updated_at, обычно для сохранения последней версии каждой строки)

Пример:

part = part.sort_values([sort_columns], ascending=True).drop_duplicates(unique_constraints, keep='last')

Конечным результатом является то, что мой многораздельный файл (2019-01-01.parquet) теперь обновлен до последних значений.

1 Ответ

0 голосов
/ 18 марта 2020

Я не могу придумать, как использовать существующие методы паркетной обработки для фрейма данных, чтобы делать то, что вам нужно, но, предполагая, что ваш фрейм данных dask разделен разумно, вы можете сделать точно такой же набор шагов в пределах map_partitions вызов. Это означает, что вы передаете составляющие pandas кадры данных в функцию, которая воздействует на них. Пока данные в каждом разделе не перекрываются, все будет в порядке.

...