Оптимизация потоковых SQL-запросов на Python - PullRequest
1 голос
/ 15 апреля 2019

Экспериментально выясняется, что многопоточность сотен «маленьких» SQL-запросов намного быстрее, чем выполнение одного большего запроса.

Код ниже работает, но мне интересно, есть ли у кого-нибудь советы по его оптимизации?При выполнении около 500 запросов, которые возвращают в общей сложности 10 миллионов строк, загрузка ЦП колеблется в пределах 90-100%.

0.Организация

# 1. Import standard modules. 
# 2. FUNCTION - Establish multiple db connections.
# 3. FUNCTION - Execute multiple queries using multiple db connections from #2. 
# 4. FUNCTION - Close db connections from #2. 
# 5. FUNCTION - Use #2 to establish multiple db connections, #3 to execute multiple queries, #4 to close db connections.

1.Импортные модули

import threading as th
import pyodbc
import pandas as pd
pyodbc.pooling = False

2.Время соединения с БД num_queries

def connect(connection_string , num_queries):

    connections , threads = [] , []

    def myfunc(i):
        connection = pyodbc.connect(connection_string)
        connections.append(connection)
    for i in range(num_queries):
        t = th.Thread(target=myfunc , args=(i,))
        threads.append(t)
    for t in threads:
        t.start() 
    for t in threads:
        t.join()

    return connections

3.Настройка потоковых функций для параллельного выполнения запросов

def concurrent(queries , connections):
    df , threads = [] , []
    num_queries = len(queries) 

    def myfunc(i):
        df.append(pd.read_sql_query(queries[i] , connections[i]))
    for i in range(num_queries):
        t = th.Thread(target=myfunc , args=(i,))
        threads.append(t)
    for t in threads:
        t.start()  
    for t in threads:
        t.join()    

    return pd.concat(df)

4.Закрыть соединения БД

def close(connections):
    threads = []

    def myfunc(i):
        i.close()
    for i in connections:
        t = th.Thread(target=myfunc , args=(i,))
        threads.append(t)   
    for t in threads:
        t.start()
    for t in threads:
        t.join() 
    for i in reversed(connections):
        connections.remove(i)

5.Стандартное восточное время.соединения, запуск запросов, закрытие соединений.Вернуть результаты.

def query(queries , connection_string):
    num_queries = len(queries)
    connections = connect(connection_string , num_queries)
    df = concurrent(queries , connections)
    close(connections)

    return df

6.Выполнить задание

if __name__ == "__main__":
    queries = ['SELECT * FROM TBL_1' , 'SELECT * FROM TBL_2' , ...]
    connection_string = 'DRIVER={SQL Server Native Client 11.0};SERVER = ...'
    query(queries , connection_string)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...