websocket запускает функцию asyn c, но возвращает ошибку: asyncio.run () не может быть вызвано из цикла выполнения событий - PullRequest
1 голос
/ 21 февраля 2020

Я пытаюсь использовать django -каналы 2 для создания веб-сокетов. Мне нужно запустить метод asyn c, который должен возвращать выходные данные команды, чтобы я мог передать данные пользователю на моем веб-сайте. Моя проблема в том, что он не позволяет мне запустить его и выдает ошибку:

asyncio.run() cannot be called from a running event loop

Что я делаю не так и что я могу с этим поделать?

consumer.py

class ChatConsumer(AsyncConsumer):

    async def websocket_connect(self, event):
        await self.send({
            "type": "websocket.accept"
        })

        user = self.scope['user']
        get_task_id = self.scope['url_route']['kwargs']['task_id']

        await asyncio.run(self.run("golemcli tasks show {}".format(get_task_id)))

        await self.send({
            "type": "websocket.send",
            "text": "hey"
        })
    async def websocket_receive(self, event):
        print("receive", event)

    async def websocket_disconnect(self, event):
        print("disconnected", event)



    async def run(self, cmd):
        proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

        stdout, stderr = await proc.communicate()

        print(f'[{cmd!r} exited with {proc.returncode}]')
        if stdout:
            print(f'[stdout]\n{stdout.decode()}')
        if stderr:
            print(f'[stderr]\n{stderr.decode()}')

Трассировка:

Exception inside application: asyncio.run() cannot be called from a running event loop
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 62, in __call__
    await await_many_dispatch([receive], self.dispatch)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/utils.py", line 52, in await_many_dispatch
    await dispatch(result)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/Users/golemgrid/Documents/GitHub/GolemGrid/overview/consumers.py", line 19, in websocket_connect
    await asyncio.run(self.run("golemcli tasks show {}".format(get_task_id)))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 34, in run
    "asyncio.run() cannot be called from a running event loop")
  asyncio.run() cannot be called from a running event loop

ОБНОВЛЕНИЕ 2 При использовании следующего фрагмента:

await self.run("golemcli tasks show {}".format(get_task_id)

Возвращается следующая трассировка:

Exception inside application: Cannot add child handler, the child watcher does not have a loop attached
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 62, in __call__
    await await_many_dispatch([receive], self.dispatch)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/utils.py", line 52, in await_many_dispatch
    await dispatch(result)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/Users/golemgrid/Documents/GitHub/GolemGrid/overview/consumers.py", line 19, in websocket_connect
    await self.run("golemcli tasks show {}".format(get_task_id))
  File "/Users/golemgrid/Documents/GitHub/GolemGrid/overview/consumers.py", line 37, in run
    stderr=asyncio.subprocess.PIPE)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/subprocess.py", line 202, in create_subprocess_shell
    stderr=stderr, **kwds)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 1503, in subprocess_shell
    protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/unix_events.py", line 193, in _make_subprocess_transport
    self._child_watcher_callback, transp)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/unix_events.py", line 924, in add_child_handler
    "Cannot add child handler, "
  Cannot add child handler, the child watcher does not have a loop attached

1 Ответ

1 голос
/ 22 февраля 2020

Вы уже находитесь в асинхронном режиме c runl oop, поэтому в зависимости от того, что вы хотите сделать, вы можете либо:

Обрабатывать вызов run в запросах потребителей на выполнение l oop

(Требуется python 3.8 из-за ошибки )

await self.run("golemcli tasks show {}".format(get_task_id)) нет необходимости во вложенном runl oop.

Это будет означать, если любые новые сообщения приходят к потребителю, пока subprocess выполняет свою работу, они будут поставлены в очередь и не будут выполняться, пока не завершится subprocess. (честно говоря, самое простое решение).

это будет означать, что ваше hey сообщение будет отправлено после того, как метод run завершится.

Создайте вложенный runl oop

(Требуется python 3,8 из-за ошибки )

Если вы хотите, чтобы ваш потребитель мог обрабатывать новые сообщения (отправляемые ему), пока ваш subprocess , (это намного сложнее, и вам не следует делать это, если вам не нужна эта функция для подключения к веб-сокету).

    async def websocket_connect(self, event):
        ...

        # start the run
        self.run_task = asyncio.create_task(self.run(...))        

        ...

    async def websocket_receive(self, event):
        print("receive", event)

    async def websocket_disconnect(self, event):
        print("disconnected", event)

        if not self.run_task.done():
            # Clean up the task for the queue we created
            self.run_task.cancel()
            try:
                # let us get any exceptions from the nested loop
                await self.run_task
            except CancelledError:
                # Ignore this error as we just triggered it
                pass
        else:
            # throw any error from this nested loop
            self.run_task.result()

Запускать как задачу блокировки, но в пуле потоков

Изменить запуск быть заданием syn c.

    # 2. Run in a custom thread pool:
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, self.run, "your cmd")
        print('custom thread pool', result)

В качестве отдельного примечания по безопасности ваш код может легко позволить кому-то запускать команды bash на вашем сервере, поскольку вы берете необработанную строку из задачи идентификатор в URL. Я предлагаю добавить некоторую проверку здесь, возможно, если у вас есть запись задачи в вашей БД, использующая значение url для поиска задач в БД, а затем используйте значение id из записи при сборке вашей команды, или если это невозможно по крайней мере убедитесь, что task_id имеет очень строгую проверку регулярных выражений, чтобы гарантировать, что он не может содержать символы, которые вы не ожидаете.

...