Сохранение файла CSV как можно большего числа файлов CSV на основе значений столбцов с помощью Dask - PullRequest
1 голос
/ 03 февраля 2020

У меня большой CSV-файл, и предположим, что он выглядит следующим образом

ID,PostCode,Value
H1A0A1-00,H1A0A1,0
H1A0A1-01,H1A0A1,0
H1A0A1-02,H1A0A1,0
H1A0A1-03,H1A0A1,0
H1A0A1-04,H1A0A1,1
H1A0A1-05,H1A0A1,0
H1A1G7-0,H1A1G7,0
H1A1G7-1,H1A1G7,0
H1A1G7-2,H1A1G7,0
H1A1N6-00,H1A1N6,0
H1A1N6-01,H1A1N6,0
H1A1N6-02,H1A1N6,0
H1A1N6-03,H1A1N6,0
H1A1N6-04,H1A1N6,0
H1A1N6-05,H1A1N6,0
...

Я хочу разделить его по значениям PostCode и сохранить все строки с тем же почтовым индексом, что и CSV. Я пытался

postals = data['PostCode'].unique()
for p in postals:
    df = data[data['PostCode'] == p]
    df.to_csv(directory + '/output/demographics/' + p + '.csv', header=False, index=False)

Есть ли способ сделать это, используя Dask для использования многопроцессорности? Спасибо

Ответы [ 2 ]

3 голосов
/ 04 февраля 2020

В случае, если вы хотите сохранить паркет, это довольно просто

Паркет

import dask.dataframe as dd
import pandas as pd
import os 

fldr = 'data_pq'
data.to_parquet(fldr, partition_on="PostCode")

это сохранение данных для каждого почтового индекса в папке с именем PostCode=xxxxxxx, которая содержит столько файлов, сколько количество разделов вашего dask.dataframe.

CSV

Здесь я предлагаю вам использовать пользовательскую функцию write_file.

import dask.dataframe as dd
import pandas as pd
import os 

fldr = "data_csv"
os.makedirs(fldr, exist_ok=True)


def write_file(grp):
    pc = grp["PostCode"].unique()[0]
    grp.to_csv(f"{fldr}/{pc}.csv",
               header=False,
               index=False)
    return None


data.groupby("PostCode")\
    .apply(write_file, meta=('x', 'f8'))\
    .compute()

# the same function works for pandas df too
# data.groupby("PostCode").apply(write_file)

Вы должны проверить, как она работает производительность и в конечном итоге играть с планировщик .

0 голосов
/ 03 февраля 2020

Dask DataFrame .to_csv метод создаст один файл на раздел вашего набора данных. Таким образом, вы можете позволить Dask создавать CSV для вас, перераспределяя ваши данные и устанавливая столбец разделения в качестве индекса:

num_partitions = data['PostCode'].nunique().compute()
data = data.repartition(npartitions=num_partitions)
data = data.set_index('PostCode')
data.to_csv('/path/to/data/export-*.csv')

Обратите внимание, что если у вас очень большой набор данных, переразбиение и установка индекса может быть дорогой, и вам может быть лучше, если вы будете перебирать каждый почтовый код, как вы изначально предложили.

...