Я хочу запустить задачу Celery в моем представлении django, но эта задача блокирует программу. Когда я запускаю эту же задачу в оболочке django, все работает. Я думаю, что приложение сельдерея неправильно загружается uvicorn?
Использование команды для отправки задачи в брокер (redis) send_to_websocket.apply_asyn c ((self.user.pk,), queue = 'websocket', priority = 1)
Я использую uvicorn для запуска django и django channel
uvicorn my_app.asgi:application --host 0.0.0.0 --port 8080 --workers 4 --reload
My asgi.py
import os
import django
from channels.routing import get_default_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
django.setup()
application = get_default_application()
Переменная Asgi в настройках. py
ASGI_APPLICATION = "my_app.routing.application"
my_app / routing.py
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator, OriginValidator
from api import channels as api_channel
application = ProtocolTypeRouter({
# (http->django views is added by default)
'websocket': AllowedHostsOriginValidator(
api_channel.TokenAuthMiddleware(
URLRouter(
api_channel.routing.urlpatterns
)
)
),
})
my_app / _ _ init_ _.py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
my_app / celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
import django
from kombu import Exchange, Queue
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
django.setup()
# set the default Django settings module for the 'celery' program and scanning for tasks.
app = Celery('api')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True)
app.conf.task_default_queue = 'default'
default_routing_key = 'default'
default_exchange = Exchange('default', type='direct')
default_queue = Queue(
'default',
default_exchange,
routing_key=default_routing_key)
websocket = Queue(
'websocket',
default_exchange,
routing_key=default_routing_key)
app.conf.task_queues = (default_queue, websocket)
app.conf.task_default_queue = default_queue
app.conf.task_default_exchange = default_exchange
app.conf.task_default_routing_key = default_routing_key