Я создаю сценарий, который получает данные из потока JSON / REST, а затем добавляет их в базу данных.Я хотел бы создать буфер, который собирает данные из потока и сохраняет их, пока они не будут успешно вставлены в базу данных.
Идея состоит в том, что один поток будет передавать данные из API в фрейм данных, а другой поток будет пытаться отправить данные в базу данных, удаляя элементы из фрейма данных после их успешной вставки в базу данных..
Я написал следующий код для проверки концепции - единственная проблема в том, что она не работает!
import threading
from threading import Thread
import pandas as pd
import numpy as np
import time
from itertools import count
# set delay
d=5
# add items to dataframe every few seconds
def appender():
testdf = pd.DataFrame([])
print('starting streamsim')
for i in count():
testdf1 = pd.DataFrame(np.random.randint(0,100,size=(np.random.randint(0,25), 4)), columns=list('ABCD'))
testdf = testdf.append(testdf1)
print('appended')
print('len is now {0}'.format(len(testdf)))
return testdf
time.sleep(np.random.randint(0,5))
# combine the dfs, and operate on them
def dumper():
print('starting dumpsim')
while True:
# check if there are values in the df
if len(testdf.index) > 0:
print('searching for values')
for index, row in testdf.iterrows():
if row['A'] < 10:
testdf.drop(index, inplace=True)
print('val dropped')
else:
print('no vals found')
# try to add rows to csv to simulate sql insert command
for index, row in testdf.iterrows():
# try to append to csv
try:
with open('footest.csv', 'a') as f:
row.to_csv(f, mode= 'a', header=True)
except:
print('append operation failed, skipping')
pass
#if operation succeeds, drop the row
else:
testdf.drop(index)
print('row dropped after success')
if len(testdf.index) == 0:
print('df is empty')
pass
time.sleep(d)
if __name__ == '__main__':
Thread(target = appender).start()
Thread(target = dumper).start()
Есть ли способ заставить эту работу?Или DataFrame «заблокирован», когда над ним работает один поток?