Одновременный доступ на запись для базы данных python + sqlalchemy sqlite с использованием многопроцессорной обработки - PullRequest
0 голосов
/ 03 ноября 2019

У меня есть код, выполнение которого занимает много времени (~ 15 минут / цикл x 30 циклов). Кроме того, каждый цикл создает значительный объем данных, который также занимает некоторое время для записи на диск. В общем, запуск в последовательном режиме может занять несколько часов. Я надеюсь ускорить это с помощью модуля multiprocessing.

До сих пор я распараллеливал вычислительно дорогие части, но при записи вещей на диск все еще существует довольно большая точка засорения, поскольку все это должно сходиться обратно к основному потоку, а итоговая сумма выше несколькихмиллион записей. С этой целью я пытаюсь выяснить, могу ли я выполнить запись из каждой задачи multiprocess, чтобы ускорить процесс.

Каждый цикл создает несколько выходных данных output1, output2 и т. Д. и хочет записать свои данные в определенный (и известный) набор баз данных. IE - output1 -> db1, output2 -> db2. Эти базы данных одинаковы для каждого цикла, и данные просто добавляются к тому, что уже есть. Все данные также имеют уникальные индексы для запросов, поэтому порядок не важен.

Поскольку я использую pandas для фактического анализа данных, я использую его методы для хранения этих DataFrame s. ,to_hdf работает в последовательном случае, но не поддерживает параллельные записи (и, очевидно, могут иметь некоторые проблемы с повреждением), но я считаю, что to_sql делает. К сожалению, я не выяснил, как создавать соединения с базами данных (я использую sqlalchemy) в каждом процессе, чтобы одновременные записи были безопасными.

Я пробовал разные вещи, чтобы попытаться передать объекты между потоками, но я всегда, кажется, получаю ошибки, такие как Synchronized Objects should only be shared between processes through inheritance при передаче объектов через mp.Queue (такие вещи, как sqlalchemy.scoped_session) или can't pickle _thread._local objectsпри попытке передать их в качестве аргумента функции.

Я просто лаю здесь не то дерево? Или я упускаю что-то очевидное?

Ниже приводится «контрольный пример», который я использовал, чтобы опробовать вещи:

import multiprocessing as mp
import numpy as np
import pandas as pd

import sqlalchemy as sql

def sqltest(num):
    print("Hello from thread {0}".format(mp.current_process().name))
    data = np.random.random((1000000,4))*num
    asDF = pd.DataFrame(data,index=np.linspace(num,num+0.99,1000000))
    # Need to write to disk in here
    return None

def func_callback(c):
    print("Got completion callback {0}".format(c))

def err_callback(c):
    print("Got error callback {0}".format(c))

def main():
    nums = range(0,10)
    sqldb = sql.create_engine("sqlite:///test.db")

    with mp.Pool() as p:
        for i in range(0,10):
            p.apply_async(sqltest,callback=func_callback,error_callback=err_callback,args=(i,))
        p.close()
        p.join()

if __name__ == '__main__':
    main()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...