Эффективный способ прочитать 15 M строк CSV-файлов в Python - PullRequest
15 голосов
/ 01 июля 2019

Для моего приложения мне нужно прочитать несколько файлов по 15 М строк в каждом, сохранить их в DataFrame и сохранить DataFrame в формате HDFS5.

Я уже пробовал разные подходы, в частности, pandas.read_csv со спецификациями chunksize и dtype и dask.dataframe.Обе они занимают около 90 секунд для обработки одного файла, и поэтому я хотел бы знать, есть ли способ эффективно обработать эти файлы описанным способом.Далее я покажу некоторый код тестов, которые я провел.

import pandas as pd
import dask.dataframe as dd
import numpy as np
import re 

# First approach
store = pd.HDFStore('files_DFs.h5')

chunk_size = 1e6

df_chunk = pd.read_csv(file,
                sep="\t",
                chunksize=chunk_size,
                usecols=['a', 'b'],
                converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                            "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
                skiprows=15
           )              
chunk_list = [] 


for chunk in df_chunk:
      chunk_list.append(chunk)


df = pd.concat(chunk_list, ignore_index=True)

store[dfname] = df
store.close()

# Second approach

df = dd.read_csv(
        file,
        sep="\t",
        usecols=['a', 'b'],
        converters={"a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                    "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x))},
        skiprows=15
     )
store.put(dfname, df.compute())
store.close()

Вот как выглядят файлы (пробел состоит из буквальной вкладки):

a   b
599.998413  14.142895
599.998413  20.105534
599.998413  6.553850
599.998474  27.116098
599.998474  13.060312
599.998474  13.766775
599.998596  1.826706
599.998596  18.275938
599.998718  20.797491
599.998718  6.132450)
599.998718  41.646194
599.998779  19.145775

Ответы [ 2 ]

6 голосов
/ 03 июля 2019

Сначала давайте ответим на заголовок вопроса

1- Как эффективно прочитать 15M строк CSV-файла с плавающей запятой

Я предлагаю вам использовать модин :

Генерация данных образца:

import modin.pandas as mpd
import pandas as pd
import numpy as np

frame_data = np.random.randint(0, 10_000_000, size=(15_000_000, 2)) 
pd.DataFrame(frame_data*0.0001).to_csv('15mil.csv', header=False)
!wc 15mil*.csv ; du -h 15mil*.csv

    15000000   15000000  480696661 15mil.csv
    459M    15mil.csv

Теперь к тестам:

%%timeit -r 3 -n 1 -t
global df1
df1 = pd.read_csv('15mil.csv', header=None)
    9.7 s ± 95.1 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
%%timeit -r 3 -n 1 -t
global df2
df2 = mpd.read_csv('15mil.csv', header=None)
    3.07 s ± 685 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
(df2.values == df1.values).all()
    True

Итак, как мы видим, модин был приблизительно в 3 раза быстрее в моей установке.


Теперь, чтобы ответить на вашу конкретную проблему

2 - Очистка CSV-файла, содержащего нечисловые символы, и затем чтение его

Как уже отмечалось, ваше узкое место, вероятно, является конвертером. Вы называете эти лямбды 30 миллионов раз. Даже затраты на вызов функции в этом масштабе становятся нетривиальными.

Давайте атаковать эту проблему.

Создание грязного набора данных:

!sed 's/.\{4\}/&)/g' 15mil.csv > 15mil_dirty.csv

Подходы

Сначала я попытался использовать модин с аргументом конвертеров. Затем я попробовал другой подход, который вызывает регулярное выражение меньше раз:

Сначала я создам объект в виде файла, который фильтрует все через ваше регулярное выражение:

class FilterFile():
    def __init__(self, file):
        self.file = file
    def read(self, n):
        return re.sub(r"[^\d.,\n]", "", self.file.read(n))
    def write(self, *a): return self.file.write(*a) # needed to trick pandas
    def __iter__(self, *a): return self.file.__iter__(*a) # needed

Затем мы передаем его пандам в качестве первого аргумента в read_csv:

with open('15mil_dirty.csv') as file:
    df2 = pd.read_csv(FilterFile(file))

Тесты:

%%timeit -r 1 -n 1 -t
global df1
df1 = pd.read_csv('15mil_dirty.csv', header=None,
        converters={0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))}
           )
    2min 28s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1 -t
global df2
df2 = mpd.read_csv('15mil_dirty.csv', header=None,
        converters={0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))}
           )
    38.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1 -t
global df3
df3 = pd.read_csv(FilterFile(open('15mil_dirty.csv')), header=None,)
    1min ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Похоже, модин снова побеждает! К сожалению, в modin еще не реализовано чтение из буферов, поэтому я разработал ULTIMATE APPROACH:

%%timeit -r 1 -n 1 -t
with open('15mil_dirty.csv') as f, open('/dev/shm/tmp_file', 'w') as tmp:
    tmp.write(f.read().translate({ord(i):None for i in '()'}))
df4 = mpd.read_csv('/dev/shm/tmp_file', header=None)
    5.68 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Используется translate, что значительно быстрее, чем re.sub, а также /dev/shm - файловая система в памяти, которую обычно предоставляет ubuntu (и другие linux). Любой записанный там файл никогда не попадет на диск, поэтому он быстрый. Наконец, он использует modin для чтения файла, обойдя ограничение буфера modin. Этот подход примерно в 30 раз быстрее , чем ваш, и также довольно прост.

2 голосов
/ 03 июля 2019

Что ж, мои выводы не сильно связаны с пандами, а скорее с некоторыми распространенными ошибками.

Your code: 
(genel_deneme) ➜  derp time python a.py
python a.py  38.62s user 0.69s system 100% cpu 39.008 total
  1. предварительно скомпилируйте свое регулярное выражение
Replace re.sub(r"[^\d.]", "", x) with precompiled version and use it in your lambdas
Result : 
(genel_deneme) ➜  derp time python a.py 
python a.py  26.42s user 0.69s system 100% cpu 26.843 total
  1. Попробуйте найти лучший способ, чем напрямую, используя np.float32, поскольку он в 6-10 раз медленнее, чем вы думаете. Следующее не то, что вы хотите, но я просто хочу показать проблему здесь.
replace np.float32 with float and run your code. 
My Result:  
(genel_deneme) ➜  derp time python a.py
python a.py  14.79s user 0.60s system 102% cpu 15.066 total

Найдите другой способ достижения результата с помощью поплавков. Подробнее по этому вопросу https://stackoverflow.com/a/6053175/37491

  1. Разделите ваш файл и работу на подпроцессы, если можете. Вы уже работаете с отдельными кусками постоянного размера. Таким образом, в основном вы можете разделить файл и обрабатывать работу в отдельных процессах, используя многопроцессорность или потоки.
...