Django многопроцессорные и соединения с базой данных - PullRequest
74 голосов
/ 23 ноября 2011

Справочная информация:

Я работаю над проектом, в котором используется Django с базой данных Postgres. Мы также используем mod_wsgi в случае, если это имеет значение, так как некоторые из моих поисков в сети упоминали об этом. При отправке веб-формы представление Django запускает задание, которое займет значительное время (больше, чем пользователь хотел бы ждать), поэтому мы запускаем задание через системный вызов в фоновом режиме. Задание, которое сейчас выполняется, должно иметь возможность чтения и записи в базу данных. Поскольку эта работа занимает так много времени, мы используем многопроцессорную обработку для ее параллельного выполнения.

Проблема:

Скрипт верхнего уровня имеет соединение с базой данных, и когда он порождает дочерние процессы, создается впечатление, что соединение родителя доступно для потомков. Тогда есть исключение о том, как SET TRANSACTION ISOLATION LEVEL должен вызываться перед запросом. Исследования показали, что это связано с попыткой использовать одно и то же соединение с базой данных в нескольких процессах. Один поток, который я нашел, предложил вызывать connection.close () при запуске дочерних процессов, чтобы Django автоматически создавал новое соединение, когда оно ему нужно, и поэтому у каждого дочернего процесса будет уникальное соединение, т.е. Это не сработало для меня, так как вызов connection.close () в дочернем процессе заставил родительский процесс жаловаться на то, что соединение потеряно.

Другие выводы:

Некоторые вещи, которые я читал, показывали, что вы не можете этого сделать, и что многопроцессорность, mod_wsgi и Django плохо сочетаются друг с другом. Кажется, в это трудно поверить.

Некоторые предлагали использовать сельдерей, что может быть долгосрочным решением, но я не могу установить сельдерей в настоящее время, ожидая некоторых процессов одобрения, поэтому сейчас не вариант.

Нашел несколько ссылок на SO и в других местах о постоянных соединениях с базой данных, что, по моему мнению, является другой проблемой.

Также найдены ссылки на psycopg2.pool и pgpool и кое-что о баунсере. По общему признанию, я не понимал большую часть того, что я читал на тех, но это конечно не выскакивало на меня как то, что я искал.

Текущий "Обходной путь":

На данный момент я вернулся к тому, чтобы запускать все поочередно, и это работает, но медленнее, чем хотелось бы.

Какие-либо предложения относительно того, как я могу использовать многопроцессорную работу для параллельной работы? Похоже, если бы у меня мог быть родитель и двое детей, которые имели бы независимые соединения с базой данных, все было бы в порядке, но я, кажется, не могу получить такое поведение.

Спасибо, и извините за длину!

Ответы [ 8 ]

64 голосов
/ 21 мая 2012

Мультипроцессирование копирует объекты соединения между процессами, потому что он разветвляет процессы, и, следовательно, копирует все файловые дескрипторы родительского процесса. Тем не менее, соединение с сервером SQL - это просто файл, вы можете увидеть его в linux в / proc // fd / .... любой открытый файл будет разделен между разветвленными процессами. Вы можете найти больше информации о разветвлении здесь .

Моим решением было просто закрыть соединение с БД непосредственно перед запуском процессов, каждый процесс заново создает соединение, когда оно ему понадобится (протестировано в django 1.4):

from django import db
db.connections.close_all()
def db_worker():      
    some_paralell_code()
Process(target = db_worker,args = ())

Pgbouncer / pgpool не связан с потоками в смысле многопроцессорности. Это скорее решение, чтобы не закрывать соединение при каждом запросе = ускорение соединения с postgres при высокой нагрузке.

Обновление:

Чтобы полностью устранить проблемы с подключением к базе данных, просто переместите всю логику, связанную с базой данных, в db_worker - я хотел передать QueryDict в качестве аргумента ... Лучшая идея - просто передать список идентификаторов ... См. QueryDict и values_list ('id', flat = True), и не забудьте включить его в список! list (QueryDict) перед передачей в db_worker. Благодаря этому мы не копируем подключение базы данных моделей.

def db_worker(models_ids):        
    obj = PartModelWorkerClass(model_ids) # here You do Model.objects.filter(id__in = model_ids)
    obj.run()


model_ids = Model.objects.all().values_list('id', flat=True)
model_ids = list(model_ids) # cast to list
process_count = 5
delta = (len(model_ids) / process_count) + 1

# do all the db stuff here ...

# here you can close db connection
from django import db
db.connections.close_all()

for it in range(0:process_count):
    Process(target = db_worker,args = (model_ids[it*delta:(it+1)*delta]))   
16 голосов
/ 27 января 2014

При использовании нескольких баз данных вы должны закрыть все соединения.

from django import db
for connection_name in db.connections.databases:
    db.connections[connection_name].close()

РЕДАКТИРОВАТЬ

Пожалуйста, используйте то же самое, что и @lechup, упомянутый для закрытия всех соединенийуверен, с какой версии django этот метод был добавлен):

from django import db
db.connections.close_all()
2 голосов
/ 17 октября 2018

У меня были проблемы с "закрытым соединением" при последовательном запуске Django тестовых случаев .Помимо тестов, существует еще один процесс, намеренно модифицирующий базу данных во время выполнения теста.Этот процесс запускается в каждом тестовом наборе setUp ().

Простым решением было наследование моих тестовых классов от TransactionTestCase вместо TestCase.Это гарантирует, что база данных была действительно записана, а другой процесс обновил данные.

2 голосов
/ 13 июля 2016

Для Python 3 и Django 1.9 это то, что у меня сработало:

import multiprocessing
import django
django.setup() # Must call setup

def db_worker():
    for name, info in django.db.connections.databases.items(): # Close the DB connections
        django.db.connection.close()
    # Execute parallel code here

if __name__ == '__main__':
    multiprocessing.Process(target=db_worker)

Обратите внимание, что без django.setup () я не смог бы заставить это работать. Я предполагаю, что для многопроцессорной обработки нужно что-то заново инициализировать.

1 голос
/ 29 июня 2015

Вы можете предоставить больше ресурсов Postgre, в Debian / Ubuntu вы можете редактировать:

nano /etc/postgresql/9.4/main/postgresql.conf

, заменив 9.4 на вашу версию postgre.

Вот несколько полезных строк, которые следуетобновленные с примерами значений для этого, имена говорят сами за себя:

max_connections=100
shared_buffers = 3000MB
temp_buffers = 800MB
effective_io_concurrency = 300
max_worker_processes = 80

Будьте осторожны, не увеличивайте слишком много этих параметров, так как это может привести к ошибкам, когда Postgre попытается получить больше ресурсов, чем доступно.Приведенные выше примеры отлично работают на машине с Debian 8GB Ram, оснащенной 4 ядрами.

1 голос
/ 31 октября 2013

Эй, я столкнулся с этой проблемой и смог ее решить, выполнив следующее (мы реализуем систему с ограниченными задачами)

task.py

from django.db import connection

def as_task(fn):
    """  this is a decorator that handles task duties, like setting up loggers, reporting on status...etc """ 
    connection.close()  #  this is where i kill the database connection VERY IMPORTANT
    # This will force django to open a new unique connection, since on linux at least
    # Connections do not fare well when forked 
    #...etc

ScheduledJob.py

from django.db import connection

def run_task(request, job_id):
    """ Just a simple view that when hit with a specific job id kicks of said job """ 
    # your logic goes here
    # ...
    processor = multiprocessing.Queue()
    multiprocessing.Process(
        target=call_command,  # all of our tasks are setup as management commands in django
        args=[
            job_info.management_command,
        ],
        kwargs= {
            'web_processor': processor,
        }.items() + vars(options).items()).start()

result = processor.get(timeout=10)  # wait to get a response on a successful init
# Result is a tuple of [TRUE|FALSE,<ErrorMessage>]
if not result[0]:
    raise Exception(result[1])
else:
   # THE VERY VERY IMPORTANT PART HERE, notice that up to this point we haven't touched the db again, but now we absolutely have to call connection.close()
   connection.close()
   # we do some database accessing here to get the most recently updated job id in the database

Честно говоря, для предотвращения условий гонки (с несколькими одновременными пользователями) было бы лучше как можно быстрее вызвать database.close () после того, как вы разветвляете процесс. Тем не менее, может существовать вероятность того, что другой пользователь, находящийся где-то внизу, полностью сделает запрос к БД, прежде чем у вас будет возможность очистить базу данных.

Честно говоря, было бы безопаснее и умнее , если бы ваша вилка не вызывала команду напрямую, а вместо этого вызывала сценарий в операционной системе, чтобы запускаемая задача в своей собственной оболочке Django!

1 голос
/ 23 ноября 2011

(не отличное решение, но возможный обходной путь)

если вы не можете использовать сельдерей, возможно, вы могли бы реализовать свою собственную систему очередей, в основном добавляя задачи в некоторую таблицу задач и имея обычный cron, которыйотбирает их и обрабатывает?(через команду управления)

0 голосов
/ 06 ноября 2017

Если все, что вам нужно, это параллелизм ввода-вывода, а не параллелизм обработки, вы можете избежать этой проблемы, переключив свои процессы на потоки. Заменить

from multiprocessing import Process

с

from threading import Thread

Объект Thread имеет тот же интерфейс, что и Procsess

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...