В настоящее время я застрял на одной из самых сложных проблем, и я постараюсь объяснить это.
У меня есть проект Django, и его основная цель - быстро выполнять поставленные в очередь задачи из БД. Я использую Celery и Celerybeat, чтобы добиться этого с помощью каналов Django, чтобы обновлять свои шаблоны с ответами в режиме реального времени.
Рабочий Celery является рабочим пулом Gevent с приличным количеством потоков.
Моя задача (упрощенная версия):
@shared_task
def exec_task(action_id):
# execute the action
action = Action.objects.get(pk=action_id)
response = post_request(action)
# update action status
if response.status_code == 200:
action.status = 'completed'
else:
action.status = 'failed'
# save the action to the DB
action.save()
channel_layer = get_channel_layer()
status_data = {'id': action.id, 'status': action.status}
status_data = json.dumps(status_data)
try:
async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
except:
event_loop = asyncio.get_running_loop()
future = asyncio.run_coroutine_threadsafe(channel_layer.group_send('channel_group', {'type': 'propergate_status', 'data': status_data}), event_loop)
result = future.result()
Моя ошибка:
[2019-10-03 18: 47: 59,990: WARNING / MainProcess] Действия в очереди: 25
[2019-10-03 18: 48: 02,206: WARNING / MainProcess] c: \users \ jack \ documents \ github \ mcr-admin \ venv \ lib \ site-packages \ gevent_socket3.py: 123: RuntimeWarning: сопрограмма 'AsyncToSync.main_wrap' никогда не ожидалась
self._read_event = io_class (fileno, 1)
RuntimeWarning: включите tracemalloc, чтобы получить трассировку выделения объекта
[2019-10-03 18: 48: 02,212: WARNING / MainProcess] c: \ users \ jack \ documents \ github \ mcr-admin \ venv \ lib \ site-packages \ gevent_socket3.py: 123: RuntimeWarning: сопрограмма 'BaseEventLoop.shutdown_asyncgens' никогда не ожидалась. self._read_event = io_class (fileno, 1) RuntimeWarning:
Первоначальнопосле того, как я сохранил действие в БД, я просто позвонил:
async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
Но я продолжал получать ошибку времени выполненияили потому что вы не можете использовать async_to_sync , если уже есть цикл событий asyncio , уже запущенный, , как показано здесь в строке 61 . Итак, у меня было несколько gevent потоков, пытающихся async_to_sync очень близко друг к другу, постоянно выдавая ошибку в ссылке.
Что привело меня к этому замечательному ответу и текущей версии exec_task , которая имеет 98% успеха при обмене сообщениями в группе каналов Django, но мне действительно нужно, чтобы она была 100%.
Проблема здесь в том, что иногдацикл событий asyncio останавливается до того, как добавляемый мной сопрограмм имеет шанс завершиться, и я настраивал свой код, играя с API-интерфейсом asyncio и цикла событий, но я либо ломаю свой код, либо получаю худшие результаты. У меня такое ощущение, что это может быть связано с функцией Asgiref async_to_sync , которая закрывает цикл раньше, но это сложно, и я только начал работать с python async пару дней назад.
Любые отзывы, комментарии, советы или исправления приветствуются!
Ура.