Как читать куски нескольких больших CSV-файлов из облачного хранилища Google, используя Dask, не перегружая память сразу - PullRequest
0 голосов
/ 30 июня 2019

Я пытаюсь прочитать кучу больших CSV-файлов (несколько файлов) из хранилища Google.Я использую библиотеку распространения Dask для параллельных вычислений, но проблема, с которой я здесь сталкиваюсь, заключается в том, что, хотя я упоминаю blockize (100mb) , я не уверен, как читать разделпо разделам и сохраните его в моей postgres базе данных, чтобы не перегружать память.

    from dask.distributed import Client
    from dask.diagnostics import ProgressBar
    client = Client(processes=False)
    import dask.dataframe as dd

    def read_csv_gcs():
      with ProgressBar():
        df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
        pd = df.compute(scheduler='threads')
        return pd

    def write_df_to_db(df):
      try:
        from sqlalchemy import create_engine
        engine = create_engine('postgresql://usr:pass@localhost:5432/sampledb')
        df.to_sql('sampletable', engine, if_exists='replace',index=False)
      except Exception as e:
        print(e)
        pass

    pd = read_csv_gcs()
    write_df_to_db(pd)

Приведенный выше код является моей базовой реализацией, но, как я уже сказал, я хотел быпрочитайте это в чанке и обновите базуЧто-то вроде

    df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
    for chunk in df:
       write_it_to_db(chunk)

Возможно ли сделать это в Dask? или я должен пойти на куски pandas и повторить их, а затем сохранить их в БД (но здесь я пропускаю параллельные вычисления)?

Может кто-нибудь пролить свет?

1 Ответ

1 голос
/ 30 июня 2019

Эта строка

df.compute(scheduler='threads')

гласит: загрузите данные порциями в рабочие потоки и объедините их все в один фрейм данных в памяти, df.Это не то, что вы хотели.Вы хотели вставить чанки по мере их поступления, а затем удалить их из памяти .

Возможно, вы хотели использовать map_partitions

df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
df.map_partitions(write_it_to_db).compute()

или использовать df.to_delayed().

Обратите внимание, что, в зависимости от вашего драйвера SQL, вы не сможете получить параллелизм таким образом, а если нет, то метод pandas iter-chunk сработает так же хорошо.

...