Я пытаюсь прочитать кучу больших CSV-файлов (несколько файлов) из хранилища Google.Я использую библиотеку распространения Dask для параллельных вычислений, но проблема, с которой я здесь сталкиваюсь, заключается в том, что, хотя я упоминаю blockize (100mb) , я не уверен, как читать разделпо разделам и сохраните его в моей postgres базе данных, чтобы не перегружать память.
from dask.distributed import Client
from dask.diagnostics import ProgressBar
client = Client(processes=False)
import dask.dataframe as dd
def read_csv_gcs():
with ProgressBar():
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
pd = df.compute(scheduler='threads')
return pd
def write_df_to_db(df):
try:
from sqlalchemy import create_engine
engine = create_engine('postgresql://usr:pass@localhost:5432/sampledb')
df.to_sql('sampletable', engine, if_exists='replace',index=False)
except Exception as e:
print(e)
pass
pd = read_csv_gcs()
write_df_to_db(pd)
Приведенный выше код является моей базовой реализацией, но, как я уже сказал, я хотел быпрочитайте это в чанке и обновите базуЧто-то вроде
df = dd.read_csv('gs://mybucket/renish/*.csv', blocksize=100e6)
for chunk in df:
write_it_to_db(chunk)
Возможно ли сделать это в Dask? или я должен пойти на куски pandas и повторить их, а затем сохранить их в БД (но здесь я пропускаю параллельные вычисления)?
Может кто-нибудь пролить свет?