Django операций с БД - PullRequest
       0

Django операций с БД

0 голосов
/ 20 января 2020

Я столкнулся с ситуацией, когда мне нужно выполнить некоторые методы, которые одновременно включают операции с БД в приложении django. Дело в том, что все процессы используют соединение по умолчанию. Что вызывает исключение в методах. Я читал в других блогах и ответы на этом сайте и других, чтобы закрыть связи на db.connections.close(), и это работает Но использование этого решения привело к другой проблеме, другие методы, которые используют БД, вызывают исключение. Что мне нужно сделать, это открыть новое соединение БД для всех процессов и закрыть соединение перед выходом из процесса. Вот как далеко я зашёл.

import time
from django.db import connections
from django.db.utils import DEFAULT_DB_ALIAS, load_backend
# Multithreading related Imports
import threading
from threading import Thread
# Multiprocessing related Imports
import multiprocessing as mp
from multiprocessing import JoinableQueue as jQueue
from multiprocessing import Pool as mp_pool
from multiprocessing import Process


class DBConnector(object):
    def __init__(self):
        print("Created a New Connection with DB--")

    # creats new connection
    def create_connection(self, alias=DEFAULT_DB_ALIAS):
        connections.ensure_defaults(alias)
        connections.prepare_test_settings(alias)
        db = connections.databases[alias]
        backend = load_backend(db['ENGINE'])
        return backend.DatabaseWrapper(db, alias)
    # For explicitly opening database connection

    def __enter__(self):
        self.dbconn = self.create_connection()
        return self.dbconn

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.dbconn.close()

def worker(func, inp, force_kill, queue, error, process_id):
    """
    @summary: A method to wrap function
    """
    try:
        with DBConnector() as db:
            res = func(inp)
        queue.update({process_id: res})
    except Exception as e:
        if force_kill:
            error.append(e)
class ProcessManager(object):
    """
    @summary: A process manager to manage multiprocesses
    """
    def __init__(self, inputs, function, timeout=ProcessManagerConfig.TIMEOUT,
                 force_kill=ProcessManagerConfig.FORCE_KILL, *args, **kwargs):
        """
        @summary: A process manager initializer
        """
        self.inputs = inputs
        self.function = function
        self.timeout = timeout
        self.force_kill = force_kill
        self.__queue = mp.Manager().dict()
        self.__processes = []
        self.__errors = mp.Manager().list()
        self.cache_store = CacheStore()

    def run(self):
        """
        @summary: A method to run all the processes concurrently
        """
        self.__create_processes()
        self.__wait_for_compeletion()

    def __create_processes(self, *args, **kwargs):
        """
        @summary: A private method to create number of processes
        """
        # Process ID is autoincremental starting from 1
        process_id = 1
        for inp in self.inputs:
            p = Process(target=worker, args=(self.function, inp,
                                             self.force_kill,
                                             self.__queue, self.__errors,
                                             process_id))
            process_id += 1
            self.__processes.append(p)

    def __start_processes(self, *args, **kwargs):
        """
        @summary: A private method to start all the process concurrently
        """
        [process.start() for process in self.__processes]

    def __wait_for_compeletion(self, *args, **kwargs):
        """
        @summmary: A private method that waits for all the process execution
        """
        start = time.time()
        self.__start_processes()
        while time.time() - start <= self.timeout:
            if not any([p.is_alive() for p in self.__processes]):
                break
            if self.__errors:
                error = list(self.__errors)[0]
                self.__terminate()
                raise error
            time.sleep(0.1)
        else:
            self.__terminate()
            msg = "Multiprocessing Reached Timeout: {} seconds".format(
                self.timeout)
            raise TimeoutException(msg, 408)
        if not self.__errors:
            # Successfully completed
            for key, value in self.__queue.items():
                self.store_process_output_to_cache(key, value)

    def __terminate(self, *args, **kwargs):
        """
        @summmary: A private method that kills all processes
        """
        [process.terminate() for process in self.__processes]

    def read_process_output_from_cache(self, process_id=None):
        """
        @summary: A method to read output from CacheStore
        @params process_id: ID of process to read data, None reads all
        """
        return self.cache_store.read(process_id)

    def store_process_output_to_cache(self, process_id, data):
        """
        @summary: A method to store output to CacheStore
        @params process_id: ID of process to store data
        """
        self.cache_store.store(process_id, data)

Помогите мне с решением, более обобщенным c лучше, но, пожалуйста, укажите мне правильное направление, если у вас есть что-нибудь, что могло бы помочь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...