Запись в таблицу стала более длительной после многопоточности - PullRequest
0 голосов
/ 20 мая 2019

Я использую многопоточность для записи данных в таблицу SQL.Даже после использования нескольких потоков (10) процесс записи стал медленнее.Процесс записи стал медленнее, чем когда я пытался сделать это в одном потоке.Я пытаюсь сократить время записи до миллисекунд.Результатом записи становится медленнее, что данные не могут быть использованы следующим процессом для дальнейшей обработки.Время записи увеличивается до нескольких секунд.Я пытаюсь сделать несколько записей в базу данных.Мой фрейм данных обновляется каждые 250 мс.

import threading 
import pandas as pd
from ib_insync import *
util.startLoop()
import datetime
import urllib.request
from sqlalchemy import create_engine
import pyodbc
import numpy
from queue import Queue
import time
from apscheduler.schedulers.background import BackgroundScheduler
##Connections to Database
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params, echo=False,fast_executemany = True) #set echo = True to print all sql statements sent to server
conn = engine.connect()
ib = IB()
ib.connect('127.0.0.1', 4002, clientId=4)
symbolList =  pd.read_csv('st2018_symbols.csv')
symbolList = symbolList['RIC'].str.split('.',expand = True)[0].tolist()
symbolList = set(symbolList)
contracts = [Stock(symbol ,'SMART', 'USD') for symbol in symbolList]
ib.qualifyContracts(*contracts)

## Requesting Market Data  
for contract in contracts:
    ib.reqMktData(contract, '', False, False)

## Function to store the incoming ticks in a DataFrame and write that DataFrame to SQL table
def onPendingTickers(tickers):
    global df,df1,df2,df3,df4,df5,q
    for t in tickers:
        df.loc[t.contract.symbol] = (t.contract.symbol,
             t.bid,t.bidSize,t.ask,t.askSize,t.last,t.volume,t.time)    
#        clear_output(wait=True)
    df1['SystemTime'] = pd.Series([datetime.datetime.now()] * len(df),index = [c.symbol for c in contracts])                

    df2 = pd.concat([df,df1],axis = 1)
    df2['SystemTime'] = pd.to_datetime(df2['SystemTime'])
    df2.reset_index(drop=True, inplace=True)
    q.put(df2)

#    print(q.queue,'df2')
def write_toDB(conn):
    global df,df1,df2,df3,df4,df5,q

    while True:
        df2 = q.get()
#        print(df2,'df2')
        start = datetime.datetime.now()
        if len(df3) > 0:
            df4 =  df2[(df2['bid'].values <= df2['ask'].values) & (0.75*df3['bid'].values <= df2['bid'].values)  & (df2['bid']<= 1.25*df3['bid'].values) & (0.75*df3['ask'].values <= df2['ask'].values) & (df2['ask'] <= 1.25*df3['ask'].values)]    
            df4 = df4.dropna(how = 'any')
#            print(df4,'df4')  
            df4.to_sql('testTick',conn,if_exists = 'append',index= False,method = 'multi',chunksize = 230)       
    #        df2.to_csv('tickData.csv',header=False,index = False,mode = 'a' )
        else:
            df5 = df2.dropna(how = 'any')
            df5.to_sql('testTick',conn,if_exists = 'append',index= False,method = 'multi',chunksize = 230) 
#            print(df5,'df5')      
        df3 = df2
#        q1.put(df3)
        print('time diff',datetime.datetime.now() - start) 
        q.task_done()

def write_toMB():
    global df3
#    while True:
#        df3 = q1.get()

    start = datetime.datetime.now()
#    print(df3)
    cursor1.execute(''' TRUNCATE TABLE dariush.dbo.MB''')
    cnxn1.commit()
    df3.to_sql('MB',conn11,if_exists = 'append',index= False,method = 'multi',chunksize = 230) 
    print("Write time",datetime.datetime.now()-start)


if __name__ == "__main__": 
    # creating thread 
    q = Queue()
    ib.pendingTickersEvent += onPendingTickers

    workers = 10
    thread_list = []
    for i in range(workers):
        t = threading.Thread(target=write_toDB,args = [conn[i],])
        t.start()
        thread_list.append(t)


     ## Timer to keep the connection open to IB for inflow of data
    ib.sleep(23400)
    ib.pendingTickersEvent -= onPendingTickers
    ib.disconnect()
    for thread in thread_list:
        thread.join()
    print("Done!") 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...