Распараллеливание процесса обработки больших файлов CSV - PullRequest
0 голосов
/ 22 февраля 2019

РЕДАКТИРОВАТЬ: Решение Сначала я разбил мои файлы на 10000 строк, используя функцию split bash.Затем:

with Pool(processes=32) as pool:
    for level in range(75):
        all_results=[]
        for f in level_dir:
            res = pool.apply_async(process_file, args=[f, level]
            all_results.append(res)
        for res in all_results:
            res.get()
        save_matrix()

Я работаю с несколькими действительно большими файлами CSV (74 из них, от 10 ГБ до 65 ГБ), и мне нужно читать их построчно, чтобы извлечь из них данные и поместить их в матрицу.

Я использую Python, так как синтаксический анализ файлов CSV с пустыми полями, а также полей JSON не так прост, как использование C / C ++.

Сейчас я делаю то, чтоиспользуйте ThreadPool, но он не выглядит так, как будто он использует процессор на полную мощность (Xeon E5), и я думаю, что это может быть из-за заполнения матрицы.

M = np.zeros((users.size, levels.size, 2))

def process_line(row):
    data    = json.loads(row[3])
    usr     = data['usr']
    #compute stuff
    M[usr, level, 0] = score
    M[usr, level, 1] = t_len

def main():
    for level in range(75):
        csv_f  = open("level{}.csv".format(level))
        reader = csv.reader(csv_f)
        t      = ThreadPool(processes=32)
        for row in reader:
            t.map(process_line, (row, level, ))
        t.join()
        t.close()
    np.save("matrix.npy", M)

Когда я печатаю отметки временипри каждом строковом процессе изменение количества процессов ничего не меняет, оно происходит так же медленно, как и при отсутствии ThreadPool.

Что можно сделать, чтобы мой код работал быстрее?

Если я продолжу так делать, буквально на это уйдет 3 месяца.

1 Ответ

0 голосов
/ 22 февраля 2019

Вы можете начать использовать pandas, открывая каждый файл как df = pd.read_csv("level{}.csv".format(level)), а затем выбрать столбцы (пусть говорит col1, col2, ...) и извлечь матрицу значений через mat = df[["col1", "col2"]].values

* 1007.* Учитывая размер ваших файлов, я предлагаю вам использовать dask.dataframe для обработки каждого отдельного файла и сохранения матрицы в хорошем формате.Затем вы можете обработать матрицу, используя dask.array
...