Как поддерживать вложенные параллельные соединения с MySQL из Python - PullRequest
2 голосов
/ 13 мая 2019

У нас есть небольшая программа на python, которая сортирует файлы из каталога, который постоянно заполняется, и соответственно загружает его в нужную таблицу MySQL.

Существует семь таблиц, и к каждой из них программа python поддерживает 5 открытых соединений (используя инструмент DBUtils.PooledDB).

Я хочу отправлять файлы через все 35 (7x5) соединений параллельно, но, хотя многопроцессорная обработка python с использованием map позволяет мне делать это на одном уровне (пять соединений с одной таблицей), родительский процесс должен выполняться последовательно ( пройдя через семь таблиц).

Я использую multiprocessing.Pool.map_async (), чтобы запустить весь процесс, поэтому все 35 подключений активны в начале, но после завершения каждой таблицы остальные таблицы должны ждать завершения всего пула.

Я хочу иметь возможность запускать все 35 процессов отдельно, чтобы они могли продолжать работать независимо друг от друга.

from multiprocessing import Pool
import pymysql
from DBUtils.PooledDB import PooledDB

# Pool of 35 connections
g_pool_0 = PooledDB(creator=pymysql, maxconnections=5 host=host, user=user db=db)
...
g_pool_6 = PooledDB(creator=pymysql, maxconnections=5 host=host, user=user db=db)

def insert_to_db(filename):
    # Load filename data into the database according to the filename

if __name__ == "__main__":

    while True:

        files_0 = glob.glob(join(cur_d, 'results/*_0.csv'))
        ...
        files_0 = glob.glob(join(cur_d, 'results/*_6.csv'))

        # start inserting to DB on all 7 databases

        pool_0 = Pool(5)
        pool_0.map_async(insert_to_db, files_0)
        ...
        pool_7 = Pool(5)
        pool_7.map_async(insert_to_db, files_7)

        pool_0.close()
        pool_0.join()
        ...
        pool_7.close()
        pool_7.join()

Это работает, но я должен использовать цикл while для продолжения обработки каталога, который получает постоянный поток файлов. Если в цикле while есть какие-либо активные соединения, другие должны ждать завершения цикла while.

Я не хочу запускать 7 отдельных скриптов. Есть ли способ сделать это во вложенных потоках / процессах?

...