У меня довольно 10+ ГБ CSV-файл, и я пробовал разные способы манипулирования данными внутри. Я использовал блоки dask и pandas, но просмотр данных лучше всего просматривать в нашей программе для просмотра БД. Так что я могу сделать следующее
j = 0
index_start = 1
start = dt.datetime.now()
disk_engine = create_engine('sqlite:///311_20M_trial2.db')
for df in pd.read_csv(R'C:\Users\akhan37\Desktop\test\test\311_Service_Requests_from_2010_to_Present.csv', chunksize = 50000, iterator=True, encoding='ANSI'):
df = df.rename(columns = {col: col.lower().replace(' ','_') for col in df.columns})
df.index += index_start
j+=1
df.to_sql('data', disk_engine, if_exists='append')
index_start = df.index[-1] + 1
print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*50000))
Это делает работу, но занимает около 30 минут.
Я полагаю, что могу многопоточность
from multiprocessing import Queue
import threading
def make_db(df):
df.to_sql('data', disk_engine, if_exists='append')
index_start = df.index[-1] + 1
return 'Done'
start = dt.datetime.now()
chunksize = 50000
j = 0
index_start = 1
disk_engine = create_engine('sqlite:///311_20M_trial2.db')
for df in pd.read_csv('311_Service_Requests_from_2010_to_Present.csv', chunksize=chunksize, iterator=True, encoding='ANSI'):
df = df.rename(columns = {col: col.lower().replace(' ','_') for col in df.columns})
df.index += index_start
j+=1
if __name__ == '__main__':
q = Queue()
for i in range(5):
lock = threading.Lock()
t= threading.Thread(target = make_db, args= df)
print(lock)
#time.sleep(1)
for job in range(10):
q.put(job)
print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunksize))
Этот код выглядит так, как будто он работает, но он не сохраняет базу данных где-либо также.
%%timeit
def main2():
q = Queue()
for x in range(8):
lock = threading.Lock()
worker = threading.Thread(target = make_db)
print(lock)
worker.daemon = True
worker.start()
q.put(worker)
q.join()
if __name__ == '__main__':
main2()
Короче, моя проблема: я хочу, чтобы второй блок кода работал правильно, и у меня возникают проблемы с пониманием того, как предполагается использовать многопоточность ... Данные можно загрузить здесь