Вот краткое изложение того, что я делаю:
Сначала я делаю это с помощью обычной многопроцессорной системы и пакета панд:
Шаг 1. Получить список имен файлов, которые я собираюсь прочитать
import os
files = os.listdir(DATA_PATH + product)
Шаг 2. цикл по списку
from multiprocessing import Pool
import pandas as pd
def readAndWriteCsvFiles(file):
### Step 2.1 read csv file into dataframe
data = pd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False)
### Step 2.2 do some calculation
### .......
### Step 2.3 write the dataframe to csv to another folder
data.to_csv("another folder/"+file)
if __name__ == '__main__':
cl = Pool(4)
cl.map(readAndWriteCsvFiles, files, chunksize=1)
cl.close()
cl.join()
Код работает нормально, но очень медленно.
Для выполнения задачи требуется около 1000 секунд.
Сравните с программой R, используя функции library(parallel)
и parSapply
.
Программа R занимает всего около 160 секунд.
Итак, я попытался с dask.delayed и dask.dataframe со следующим кодом:
Шаг 1. Получить список имен файлов, которые я собираюсь прочитать
import os
files = os.listdir(DATA_PATH + product)
Шаг 2. цикл по списку
from dask.delayed import delayed
import dask.dataframe as dd
from dask import compute
def readAndWriteCsvFiles(file):
### Step 2.1 read csv file into dataframe
data = dd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False, assume_missing=True)
### Step 2.2 do some calculation
### .......
### Step 2.3 write the dataframe to csv to another folder
data.to_csv(filename="another folder/*", name_function=lambda x: file)
compute([delayed(readAndWriteCsvFiles)(file) for file in files])
На этот раз я обнаружил, что если я закомментировал шаг 2.3 в коде dask и коде панд, dask будет работать намного быстрее, чем обычные панды и многопроцессорная обработка.
Но если я вызову метод to_csv, тогда dask будет таким же медленным, как pandas.
Есть решение?
Спасибо