Как я могу использовать параллелизм для переноса базы данных в Python? - PullRequest
1 голос
/ 30 января 2020

У меня есть скрипт, используемый для переноса данных из SQLite в Postgres. Я просто использую для l oop для передачи таблиц одну за другой. Теперь я хочу поэкспериментировать с параллельной передачей нескольких таблиц, используя потоки, многопроцессорность или асинхронность, чтобы ускорить выполнение программы и сравнить время выполнения между этими способами. Как вы делаете один из этих способов?

Вот мой сценарий:

import psycopg2, sqlite3, sys
import time
import multiprocessing



sqdb="C://Users//duongnb//Desktop//Python//SqliteToPostgreFull//testmydb6.db"
sqlike="table"
pgdb="testmydb11"
pguser="postgres"
pgpswd="1234"
pghost="127.0.0.1"
pgport="5432"


consq=sqlite3.connect(sqdb)
cursq=consq.cursor()

tabnames=[]
print() 
cursq.execute('SELECT name FROM sqlite_master WHERE type="table" AND name LIKE "%table%";')
tabgrab = cursq.fetchall()
for item in tabgrab:
    tabnames.append(item[0])
print(tabgrab)

def copyTable(table):
        print(table)
        cursq.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = ?;", (table,))
        create = cursq.fetchone()[0]
        cursq.execute("SELECT * FROM %s;" %table)
        rows=cursq.fetchall()
        colcount=len(rows[0])
        pholder='%s,'*colcount
        newholder=pholder[:-1]

        try:

            conpg = psycopg2.connect(database=pgdb, user=pguser, password=pgpswd,
                                host=pghost, port=pgport)
            curpg = conpg.cursor()
            curpg.execute("DROP TABLE IF EXISTS %s;" %table)
            create = create.replace("AUTOINCREMENT", "")
            curpg.execute(create)
            curpg.executemany("INSERT INTO %s VALUES (%s);" % (table, newholder),rows)
            conpg.commit()

            if conpg:
                conpg.close()

        except psycopg2.DatabaseError as e:
            print ('Error %s' % e) 
            sys.exit(1)

        finally:
            print("Complete")    

consq.close()

if __name__ == "__main__":
    start_time = time.time()
    for table in tabnames:
        p = multiprocessing.Process(target = copyTable, args = (table))
        p.start()
    for table in tabnames:
        p.join()
    print("All processes finished.")      

    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

1 Ответ

0 голосов
/ 30 января 2020

Вы должны поместить внутреннюю часть for table in tabnames в функцию, скажем, copyTable. Тогда вы сможете использовать пакет multiprocessing для распараллеливания вашего кода. Это должно выглядеть примерно так:

for table in tabnames:
    p = multiprocessing.Process(target = copyTable, args = (table))
    p.start()
for table in tabnames:
    p.join()
print("All processes finished.")

Но вы можете ускорить свой код еще больше, если вы используете COPY (https://www.postgresql.org/docs/current/sql-copy.html) вместо множества INSERT команды.

Вместо модуля multiprocessing вы также можете использовать модуль threading, который работает аналогично. Тогда у вас есть потоки вместо процессов. Из-за блокировки интерпретатора я ожидал бы худшую производительность с этим.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...