Потребитель asyn c rabbitmq с сокетом connect_ex - PullRequest
0 голосов
/ 20 июня 2020

Я хочу написать программу python, которая получает порт ip и tcp с сервера rabbitmq и сканирует, чтобы проверить, открыт ли порт, поскольку эти проверки иногда выполняются массово (возможно, 100 портов, пары ip добавляются в очередь за раз) Мне нужно выполнять сканирование асинхронно, чтобы получить все результаты вовремя, и даже если я уменьшу тайм-аут до 1 секунды, 30 закрытых портов будут удерживать сканирование в течение 30 секунд каждый раз! Я пробовал asyncio и aio_pika, чтобы достичь своей цели, но сканирование все равно выполняется синхронно.

import asyncio
import aio_pika
import socket


async def tcp_check(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    await asyncio.sleep(1)
    result = sock.connect_ex((host,port))
    print (str(result))

async def main(loop):
    connection = await aio_pika.connect_robust("amqp://user:password@192.168.1.100/")
    async with connection:
        queue_name = "tcp_scans"
        channel = await connection.channel()
        queue = await channel.declare_queue(queue_name, auto_delete=False, durable=True)
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    context = message.body.decode("utf-8").split(',')
                    await tcp_check(context[0], int(context[1]))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

UPDATE:

Я тоже использовал asyncio.open_connection:

async def tcp_check(host, port):
    con = asyncio.open_connection(host, port, loop=loop)
    try:
        await asyncio.wait_for(con, timeout=1)
        print("{}:{} Connected".format(host, port))
    except asyncio.TimeoutError:
        print ("{}:{} Closeed".format(host, port))

Тем не менее, он берет каждый элемент из списка и тестирует один за другим ...

1 Ответ

0 голосов
/ 20 июня 2020

Следует избегать вызова синхронных длительных функций внутри асинхронных сопрограмм. Я бы предложил использовать asyncio-альтернативу connect_ex, например:

    try:
        await asyncio.open_connection(host, port)
    except Exception as e:
        print(e)

Для одновременного выполнения некоторых сопрограмм «на лету» вы можете использовать create_task, который «оборачивает сопрограмму в задачу и расписание его выполнение "как написано в do c. И после этого сопрограмма будет выполнена вскоре, например, после следующей await или async for итерации, когда поток управления вернется к событию-l oop. create_task вернуть Task объект, который вы можете добавить в список и дождаться их завершения, используя asyncio.gather с флагом return_exceptions=True. Но в вашем случае, я думаю, будет достаточно заменить await tcp_check() на create_task(tcp_check()) и использовать gather в конце вашего main (), чтобы гарантировать, что все сопрограммы завершены.

...
asyncio.create_task(tcp_check(context[0], int(context[1])))
...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...