Предположим, у меня есть следующая система, предназначенная для выполнения асинхронных c запросов к некоторому URL с использованием asyncio
loop = asyncio.get_event_loop()
data = [...]
try:
tasks = [self.make_request(key, item) for key, item in data]
loop.run_until_complete(tasks)
finally:
loop.close()
def make_request(key, item):
async with aiohttp.ClientSession(headers=self.headers) as session:
response = await session.post("http:/xx.com", json={"item": item})
print("OK.")
Переменная данных содержит все сообщения, которые я хочу обработать, и их использование с помощью asyncio работает нормально. , Затем RabbitMQ встает на пути, и потребление из очереди блокирует что-либо после строки basic_consume.
def handle(message):
loop = asyncio.get_event_loop()
data = [...]
try:
task = asyncio.create_task(self.make_request(0, message))
loop.run_until_complete(task)
finally:
loop.close()
connection = pika.BlockingConnection(#...params)
channel = connection.channel()
channel.basic_consume(
queue='somequeue',
on_message_callback=handle,
)
channel.start_consuming()
метод handle вызывается каждый раз, когда потребитель получает новое сообщение, передавая параметр сообщения в качестве каждого элемента в переменной данных. Поэтому вместо того, чтобы иметь все сопрограммы одновременно, я получаю их по одному, как должен быть код на этом этапе?