Как я могу использовать модели django в сопрограмме asyncio, запущенной в рамках задачи сельдерея? - PullRequest
0 голосов
/ 16 февраля 2019

Я реорганизовал некоторый код django, чтобы сделать некоторый веб-анализ.Я запускаю отдельную задачу Celery для каждого пользователя, для которого я собираю данные.В каждой задаче Celery я использую asyncio и aiohttp, чтобы выполнить очистку для данного пользователя.

Я могу получить доступ ко всем своим классам и методам модели django, но как только я что-то предпринимаю для запуска фактического запроса к базе данных, я получаю сообщение об ошибке:

...
[2019-02-16 18:04:38,126] WARNING log /home/chrisadmin/anaconda3/lib/python3.6/site-packages/celery/app/trace.py:561: RuntimeWarning: Exception raised outside body: OperationalError('SSL SYSCALL error: Bad file descriptor\n',):
Traceback (most recent call last):
  File "/home/chrisadmin/anaconda3/lib/python3.6/site-packages/django/db/backends/utils.py", line 85, in _execute
    return self.cursor.execute(sql, params)
psycopg2.OperationalError: SSL SYSCALL error: Socket operation on non-socket
...

ВнутриЗадачи Celery, я могу делать вещи, которые заставляют Django взаимодействовать с базой данных без каких-либо проблем, если только они не связаны с асинхронностью.Кроме того, я могу успешно взаимодействовать с базой данных через Django в задачах asyncio, если эти задачи asyncio в свою очередь не запускаются из задачи Celery.

Если я установлю CELERY_TASK_ALWAYS_EAGER=True, я не получуисключения, но, конечно, в этом случае задачи Celery не запускаются одновременно.

Для удаления одного пользователя более чем достаточно asyncio / aiohttp.Но я хочу использовать Celery, чтобы иметь возможность масштабировать процессы / машины и параллельно обрабатывать несколько пользователей.Ранее я пытался использовать исключительно Celery, но я пытался выполнить рефакторинг с помощью asyncio / aiohttp, чтобы уменьшить накладные расходы там, где это не нужно.

Я хочу иметь возможность использовать Celery для параллельного запуска очистки для нескольких пользователей изатем в каждой задаче Celery я хочу иметь возможность очищать соответствующих пользователей, в том числе сохранять их очищенные данные с помощью моделей / методов django.

1 Ответ

0 голосов
/ 17 февраля 2019

После некоторого дальнейшего исследования кажется, что есть проблемы с соединениями с базой данных и возможной безопасностью потоков при объединении асинхронных и сельдерейных программ, которые я не до конца понимаю.

Решение, которое, кажется, работает такдалеко было создать новую функцию:

from django.db import connections
from django.conf import settings


def reset_db_connections():
    if not settings.CELERY_TASK_ALWAYS_EAGER:
        connections.close_all()

Я называю эту функцию первой строкой любой задачи сельдерея:

@shared_task(bind=True)
def my_celery_task(self, args):
    reset_db_connections()
    # do stuff
    # call stuff that uses asyncio

Пока что, похоже, мой код работаетнезависимо от того, какая у меня настройка для CELERY_TASK_ALWAYS_EAGER.

Первоначально вместо connections.close_all() я пытался:

 for conn in db.connections.all():
     if conn.connection.closed != 0:
         conn.connection.close()

, но это приводило к ошибкам, когда я не закрывал соединения, которые нужно было закрыть, и / или я закрывал соединения, которые былиуже закрыт.

В качестве альтернативы вышеприведенному решению я обнаружил, что изменяя настройку:

DATABASES = {"default": dj_database_url.config(conn_max_age=600)}

на

DATABASES = {"default": dj_database_url.config(conn_max_age=0)}

также решил проблему.Однако, насколько я понимаю, установка conn_max_age=0 будет использовать новое соединение для каждой операции с БД, что не кажется хорошей идеей.С подходом reset_db_connections(), приведенным выше, я могу оставить conn_max_age=600.

...