У меня есть код, выполнение которого занимает много времени (~ 15 минут / цикл x 30 циклов). Кроме того, каждый цикл создает значительный объем данных, который также занимает некоторое время для записи на диск. В общем, запуск в последовательном режиме может занять несколько часов. Я надеюсь ускорить это с помощью модуля multiprocessing
.
До сих пор я распараллеливал вычислительно дорогие части, но при записи вещей на диск все еще существует довольно большая точка засорения, поскольку все это должно сходиться обратно к основному потоку, а итоговая сумма выше несколькихмиллион записей. С этой целью я пытаюсь выяснить, могу ли я выполнить запись из каждой задачи multiprocess
, чтобы ускорить процесс.
Каждый цикл создает несколько выходных данных output1
, output2
и т. Д. и хочет записать свои данные в определенный (и известный) набор баз данных. IE - output1 -> db1
, output2 -> db2
. Эти базы данных одинаковы для каждого цикла, и данные просто добавляются к тому, что уже есть. Все данные также имеют уникальные индексы для запросов, поэтому порядок не важен.
Поскольку я использую pandas
для фактического анализа данных, я использую его методы для хранения этих DataFrame
s. ,to_hdf
работает в последовательном случае, но не поддерживает параллельные записи (и, очевидно, могут иметь некоторые проблемы с повреждением), но я считаю, что to_sql
делает. К сожалению, я не выяснил, как создавать соединения с базами данных (я использую sqlalchemy
) в каждом процессе, чтобы одновременные записи были безопасными.
Я пробовал разные вещи, чтобы попытаться передать объекты между потоками, но я всегда, кажется, получаю ошибки, такие как Synchronized Objects should only be shared between processes through inheritance
при передаче объектов через mp.Queue
(такие вещи, как sqlalchemy.scoped_session
) или can't pickle _thread._local objects
при попытке передать их в качестве аргумента функции.
Я просто лаю здесь не то дерево? Или я упускаю что-то очевидное?
Ниже приводится «контрольный пример», который я использовал, чтобы опробовать вещи:
import multiprocessing as mp
import numpy as np
import pandas as pd
import sqlalchemy as sql
def sqltest(num):
print("Hello from thread {0}".format(mp.current_process().name))
data = np.random.random((1000000,4))*num
asDF = pd.DataFrame(data,index=np.linspace(num,num+0.99,1000000))
# Need to write to disk in here
return None
def func_callback(c):
print("Got completion callback {0}".format(c))
def err_callback(c):
print("Got error callback {0}".format(c))
def main():
nums = range(0,10)
sqldb = sql.create_engine("sqlite:///test.db")
with mp.Pool() as p:
for i in range(0,10):
p.apply_async(sqltest,callback=func_callback,error_callback=err_callback,args=(i,))
p.close()
p.join()
if __name__ == '__main__':
main()