Python Asyn c фоновая задача для опроса облачной очереди и контроля выхода при отсутствии сообщений - PullRequest
0 голосов
/ 25 марта 2020

Приложению необходимо непрерывно опрашивать 2 AWS очереди SQS для входящих сообщений и обрабатывать их по мере их поступления, но это необходимо делать в фоновом режиме, обеспечивая контроль, когда (а) ожидает сообщения. Также требуется возможность изящного отключения, следовательно, методы connect / disconnect приведены ниже.

Использование Python 3.8.2

import asyncio


class App():

    def connect(self):
        print('connecting')
        self.t1 = asyncio.create_task(self.poll_queues_1())
        self.t2 = asyncio.create_task(self.poll_queues_2())
        print('connection complete')

    def disconnect(self):
        print('disconnecting')
        self.t1.cancel()
        self.t2.cancel()
        print('disconnection complete')

    async def poll_queues_1(self):
        while True:
            # simulate an async method that polls a remote cloud Queue for incoming messages
            await asyncio.sleep(10)
            print('func 1 polling')

    async def poll_queues_2(self):
        while True:
            # simulate an async method that polls a remote cloud Queue for incoming messages
            await asyncio.sleep(10)
            print('func 2 polling')
  1. Раньше это работало так, как есть, даже не касаясь события l oop - I думаю, всегда был один бегущий на заднем плане. Теперь я сменил компьютеры, и почему-то это не работает на моем новом компьютере - я получаю это:
In [2]: app = App()

In [3]: app.connect()
connecting
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-3-f4a7413208aa> in <module>
----> 1 app.connect()

<ipython-input-1-2b38a506601f> in connect(self)
      6     def connect(self):
      7         print('connecting')
----> 8         self.t1 = asyncio.create_task(self.poll_queues_1())
      9         self.t2 = asyncio.create_task(self.poll_queues_2())
     10         print('connection complete')

~\Miniconda3\envs\quant-trading\lib\asyncio\tasks.py in create_task(coro, name)
    379     Return a Task object.
    380     """
--> 381     loop = events.get_running_loop()
    382     task = loop.create_task(coro)
    383     _set_task_name(task, name)

RuntimeError: no running event loop
Проблема в том, что если я оберну все это в asyncio.run(), я никогда не получу обратно контроль.

1 Ответ

0 голосов
/ 26 марта 2020

Я не могу комментировать разницу в поведении между двумя компьютерами. Это не имеет никакого смысла. Но я могу обратиться к вашему пункту № 2. Программирование в Asyncio требует другого подхода.

Я могу дать только общее представление о том, как может быть структурировано ваше приложение.

Возьмите все, что вы хотели бы, чтобы ваша программа делала, кроме соедините вещи, которые вы описали в своем вопросе. Поместите все эти функции в сопрограмму:

async def all_the_other_stuff():
     # your code here

Теперь вы создаете основную функцию примерно так:

async def main():
     app = App()
     app.connect()
     await all_the_other_stuff()
     app.disconnect()

В качестве последней строки в скрипте (обычно):

asyncio.run(main())

Одно важное предостережение: для того, чтобы это работало, функция all_the_other_stuff должна содержать одно или несколько ожиданий. В отличие от многопоточности, с asyncio параллелизм кооператив . Как только задача получает контроль, она остается под контролем до следующего ожидания, после чего она сдается, и другие задачи получают шанс на выполнение. Только с одним потоком и одним процессом - так и должно быть; если вы хотите превентивный (в отличие от кооперативного) параллелизма, вы должны использовать потоки или процессы.

...