Как мне эффективно написать несколько CSV-файлов, используя dask.dataframe? - PullRequest
0 голосов
/ 15 сентября 2018

Вот краткое изложение того, что я делаю:

Сначала я делаю это с помощью обычной многопроцессорной системы и пакета панд:

Шаг 1. Получить список имен файлов, которые я собираюсь прочитать

import os    
files = os.listdir(DATA_PATH + product)

Шаг 2. цикл по списку

from multiprocessing import Pool
import pandas as pd    

def readAndWriteCsvFiles(file):
    ### Step 2.1 read csv file into dataframe 
    data = pd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False)

    ### Step 2.2 do some calculation
    ### .......

    ### Step 2.3 write the dataframe to csv to another folder
    data.to_csv("another folder/"+file)

if __name__ == '__main__':
    cl = Pool(4)
    cl.map(readAndWriteCsvFiles, files, chunksize=1)
    cl.close()
    cl.join()  

Код работает нормально, но очень медленно.

Для выполнения задачи требуется около 1000 секунд.

Сравните с программой R, используя функции library(parallel) и parSapply.

Программа R занимает всего около 160 секунд.

Итак, я попытался с dask.delayed и dask.dataframe со следующим кодом:

Шаг 1. Получить список имен файлов, которые я собираюсь прочитать

import os    
files = os.listdir(DATA_PATH + product)

Шаг 2. цикл по списку

from dask.delayed import delayed
import dask.dataframe as dd
from dask import compute

def readAndWriteCsvFiles(file):
    ### Step 2.1 read csv file into dataframe 
    data = dd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False, assume_missing=True)

    ### Step 2.2 do some calculation
    ### .......

    ### Step 2.3 write the dataframe to csv to another folder
    data.to_csv(filename="another folder/*", name_function=lambda x: file)

compute([delayed(readAndWriteCsvFiles)(file) for file in files])

На этот раз я обнаружил, что если я закомментировал шаг 2.3 в коде dask и коде панд, dask будет работать намного быстрее, чем обычные панды и многопроцессорная обработка.

Но если я вызову метод to_csv, тогда dask будет таким же медленным, как pandas.

Есть решение?

Спасибо

1 Ответ

0 голосов
/ 15 сентября 2018

Чтение и запись файлов CSV часто связаны с GIL. Возможно, вы захотите попробовать распараллелить с процессами, а не с потоками (по умолчанию для dask с задержкой).

Этого можно добиться, добавив ключевое слово scheduler='processes' к вашему вычислительному вызову.

compute([delayed(readAndWriteCsvFiles)(file) for file in files], scheduler='processes')

См. документация по планированию для получения дополнительной информации

Также обратите внимание, что здесь вы не используете dask.dataframe, а скорее dask.delayed.

...