многопоточный pd чанк к sql алхимии - PullRequest
0 голосов
/ 19 марта 2020

У меня довольно 10+ ГБ CSV-файл, и я пробовал разные способы манипулирования данными внутри. Я использовал блоки dask и pandas, но просмотр данных лучше всего просматривать в нашей программе для просмотра БД. Так что я могу сделать следующее

j = 0
index_start = 1
start = dt.datetime.now()
disk_engine = create_engine('sqlite:///311_20M_trial2.db')
for df in pd.read_csv(R'C:\Users\akhan37\Desktop\test\test\311_Service_Requests_from_2010_to_Present.csv', chunksize = 50000, iterator=True, encoding='ANSI'):
    df = df.rename(columns = {col: col.lower().replace(' ','_') for col in df.columns})
    df.index += index_start
    j+=1
    df.to_sql('data', disk_engine, if_exists='append')
    index_start = df.index[-1] + 1
    print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*50000))

Это делает работу, но занимает около 30 минут.

Я полагаю, что могу многопоточность

from multiprocessing import Queue
import threading
def make_db(df):
    df.to_sql('data', disk_engine, if_exists='append')
    index_start = df.index[-1] + 1
    return 'Done'
start = dt.datetime.now()
chunksize = 50000
j = 0
index_start = 1
disk_engine = create_engine('sqlite:///311_20M_trial2.db')
for df in pd.read_csv('311_Service_Requests_from_2010_to_Present.csv', chunksize=chunksize, iterator=True, encoding='ANSI'):
    df = df.rename(columns = {col: col.lower().replace(' ','_') for col in df.columns})
    df.index += index_start
    j+=1
    if __name__ == '__main__':
        q = Queue()
        for i in range(5):
            lock = threading.Lock()
            t= threading.Thread(target = make_db, args= df)
            print(lock)
            #time.sleep(1)
        for job in range(10):
            q.put(job)
    print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunksize))

Этот код выглядит так, как будто он работает, но он не сохраняет базу данных где-либо также.

%%timeit
def main2():
    q = Queue()
    for x in range(8):
        lock = threading.Lock()
        worker = threading.Thread(target = make_db)

        print(lock)
        worker.daemon = True
        worker.start()
        q.put(worker)
    q.join()
if __name__ == '__main__':
    main2()

Короче, моя проблема: я хочу, чтобы второй блок кода работал правильно, и у меня возникают проблемы с пониманием того, как предполагается использовать многопоточность ... Данные можно загрузить здесь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...