Я столкнулся с ситуацией, когда мне нужно выполнить некоторые методы, которые одновременно включают операции с БД в приложении django. Дело в том, что все процессы используют соединение по умолчанию. Что вызывает исключение в методах. Я читал в других блогах и ответы на этом сайте и других, чтобы закрыть связи на db.connections.close()
, и это работает Но использование этого решения привело к другой проблеме, другие методы, которые используют БД, вызывают исключение. Что мне нужно сделать, это открыть новое соединение БД для всех процессов и закрыть соединение перед выходом из процесса. Вот как далеко я зашёл.
import time
from django.db import connections
from django.db.utils import DEFAULT_DB_ALIAS, load_backend
# Multithreading related Imports
import threading
from threading import Thread
# Multiprocessing related Imports
import multiprocessing as mp
from multiprocessing import JoinableQueue as jQueue
from multiprocessing import Pool as mp_pool
from multiprocessing import Process
class DBConnector(object):
def __init__(self):
print("Created a New Connection with DB--")
# creats new connection
def create_connection(self, alias=DEFAULT_DB_ALIAS):
connections.ensure_defaults(alias)
connections.prepare_test_settings(alias)
db = connections.databases[alias]
backend = load_backend(db['ENGINE'])
return backend.DatabaseWrapper(db, alias)
# For explicitly opening database connection
def __enter__(self):
self.dbconn = self.create_connection()
return self.dbconn
def __exit__(self, exc_type, exc_val, exc_tb):
self.dbconn.close()
def worker(func, inp, force_kill, queue, error, process_id):
"""
@summary: A method to wrap function
"""
try:
with DBConnector() as db:
res = func(inp)
queue.update({process_id: res})
except Exception as e:
if force_kill:
error.append(e)
class ProcessManager(object):
"""
@summary: A process manager to manage multiprocesses
"""
def __init__(self, inputs, function, timeout=ProcessManagerConfig.TIMEOUT,
force_kill=ProcessManagerConfig.FORCE_KILL, *args, **kwargs):
"""
@summary: A process manager initializer
"""
self.inputs = inputs
self.function = function
self.timeout = timeout
self.force_kill = force_kill
self.__queue = mp.Manager().dict()
self.__processes = []
self.__errors = mp.Manager().list()
self.cache_store = CacheStore()
def run(self):
"""
@summary: A method to run all the processes concurrently
"""
self.__create_processes()
self.__wait_for_compeletion()
def __create_processes(self, *args, **kwargs):
"""
@summary: A private method to create number of processes
"""
# Process ID is autoincremental starting from 1
process_id = 1
for inp in self.inputs:
p = Process(target=worker, args=(self.function, inp,
self.force_kill,
self.__queue, self.__errors,
process_id))
process_id += 1
self.__processes.append(p)
def __start_processes(self, *args, **kwargs):
"""
@summary: A private method to start all the process concurrently
"""
[process.start() for process in self.__processes]
def __wait_for_compeletion(self, *args, **kwargs):
"""
@summmary: A private method that waits for all the process execution
"""
start = time.time()
self.__start_processes()
while time.time() - start <= self.timeout:
if not any([p.is_alive() for p in self.__processes]):
break
if self.__errors:
error = list(self.__errors)[0]
self.__terminate()
raise error
time.sleep(0.1)
else:
self.__terminate()
msg = "Multiprocessing Reached Timeout: {} seconds".format(
self.timeout)
raise TimeoutException(msg, 408)
if not self.__errors:
# Successfully completed
for key, value in self.__queue.items():
self.store_process_output_to_cache(key, value)
def __terminate(self, *args, **kwargs):
"""
@summmary: A private method that kills all processes
"""
[process.terminate() for process in self.__processes]
def read_process_output_from_cache(self, process_id=None):
"""
@summary: A method to read output from CacheStore
@params process_id: ID of process to read data, None reads all
"""
return self.cache_store.read(process_id)
def store_process_output_to_cache(self, process_id, data):
"""
@summary: A method to store output to CacheStore
@params process_id: ID of process to store data
"""
self.cache_store.store(process_id, data)
Помогите мне с решением, более обобщенным c лучше, но, пожалуйста, укажите мне правильное направление, если у вас есть что-нибудь, что могло бы помочь.