Каналы Django отправляют групповое сообщение из задания Celery. Остановка цикла событий Asyncio до завершения всех асинхронных задач - PullRequest
1 голос
/ 03 октября 2019

В настоящее время я застрял на одной из самых сложных проблем, и я постараюсь объяснить это.

У меня есть проект Django, и его основная цель - быстро выполнять поставленные в очередь задачи из БД. Я использую Celery и Celerybeat, чтобы добиться этого с помощью каналов Django, чтобы обновлять свои шаблоны с ответами в режиме реального времени.

Рабочий Celery является рабочим пулом Gevent с приличным количеством потоков.

Моя задача (упрощенная версия):

@shared_task
def exec_task(action_id):
  # execute the action
  action = Action.objects.get(pk=action_id)
  response = post_request(action)

  # update action status
  if response.status_code == 200:
    action.status = 'completed'

  else:
    action.status = 'failed'

  # save the action to the DB
  action.save()

  channel_layer = get_channel_layer()
  status_data = {'id': action.id, 'status': action.status}
  status_data = json.dumps(status_data)
  try:
    async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
  except:
    event_loop = asyncio.get_running_loop()
    future = asyncio.run_coroutine_threadsafe(channel_layer.group_send('channel_group', {'type': 'propergate_status', 'data': status_data}), event_loop)
    result = future.result()

Моя ошибка:

[2019-10-03 18: 47: 59,990: WARNING / MainProcess] Действия в очереди: 25

[2019-10-03 18: 48: 02,206: WARNING / MainProcess] c: \users \ jack \ documents \ github \ mcr-admin \ venv \ lib \ site-packages \ gevent_socket3.py: 123: RuntimeWarning: сопрограмма 'AsyncToSync.main_wrap' никогда не ожидалась
self._read_event = io_class (fileno, 1)

RuntimeWarning: включите tracemalloc, чтобы получить трассировку выделения объекта

[2019-10-03 18: 48: 02,212: WARNING / MainProcess] c: \ users \ jack \ documents \ github \ mcr-admin \ venv \ lib \ site-packages \ gevent_socket3.py: 123: RuntimeWarning: сопрограмма 'BaseEventLoop.shutdown_asyncgens' никогда не ожидалась. self._read_event = io_class (fileno, 1) RuntimeWarning:

Первоначальнопосле того, как я сохранил действие в БД, я просто позвонил:

async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})

Но я продолжал получать ошибку времени выполненияили потому что вы не можете использовать async_to_sync , если уже есть цикл событий asyncio , уже запущенный, , как показано здесь в строке 61 . Итак, у меня было несколько gevent потоков, пытающихся async_to_sync очень близко друг к другу, постоянно выдавая ошибку в ссылке.

Что привело меня к этому замечательному ответу и текущей версии exec_task , которая имеет 98% успеха при обмене сообщениями в группе каналов Django, но мне действительно нужно, чтобы она была 100%.

Проблема здесь в том, что иногдацикл событий asyncio останавливается до того, как добавляемый мной сопрограмм имеет шанс завершиться, и я настраивал свой код, играя с API-интерфейсом asyncio и цикла событий, но я либо ломаю свой код, либо получаю худшие результаты. У меня такое ощущение, что это может быть связано с функцией Asgiref async_to_sync , которая закрывает цикл раньше, но это сложно, и я только начал работать с python async пару дней назад.

Любые отзывы, комментарии, советы или исправления приветствуются!

Ура.

...