asyncio - запуск бесконечного цикла.create_connection () - PullRequest
0 голосов
/ 11 июня 2018

Я безуспешно пытался создать бесконечную цепочку клиентских экземпляров.

Я разрабатываю приложение asyncio, это приложение, среди многих других вещей, например, запуск сервера с loop.create_server(), необходимо каждые 10 секунд подключаться к списку серверов, отправлять некоторые данные и затем отключаться.

Я получаю 2 ошибки: «runtimeError: цикл обработки событий выполняется».или «asyncio> Задача была уничтожена, но она ожидает выполнения!»

Код внизу работает.

import asyncio
from tcp.client import Client


def send_to_peers(data):
    for index in range(1, 3): #this for-loop just for simulating a list of peers
    try:
        loop = asyncio.get_event_loop()
        coro = loop.create_connection(lambda: Client(), '127.0.0.1', 10000 + index)
        _, proto = loop.run_until_complete(coro)
        msg = data + "-" + str(index) + "\n"
        proto.transport.write(str.encode(msg))
        proto.transport.close()
    except ConnectionRefusedError as exc:
        print(exc)

def infinite():
    for index in range(5): #again this should be a While True:
        #there should be here an asyncio.sleep(10)
        send_to_peers(str(index))

infinite()

Но когда я вызываю его из main_loop, все начинает ломаться.

async def infinite_loop():
    for index in range(5):
        print("loop n " + str(index))
        task = asyncio.Task(send_to_peers(str(index)))
        await asyncio.sleep(10)
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task

main_loop = asyncio.get_event_loop()
main_loop.run_until_complete(infinite_loop())
main_loop.run_forever()

Я пытался передать main_loop send_to_peers, передать его классу Client (loop), я пытался остановить и закрыть цикл, удалить задачу, использовать странную комбинацию sure_future, но ничего не работает.

Я гуглил столько, сколько мог, я читал, что нехорошо вкладывать бесконечные циклы, но я не нашел другого пути.

Моя последняя надежда - использовать многопоточность, но даже если ядумаю, что это сработает, это не будет ни элегантным решением, ни правильным.

Я привык работать с Node, поэтому, извините, если я сделал глупую ошибку, я подумал, что через 2 неделиЯ мог бы сделать это, но я здесь.

Я был бы очень признателен за любую помощь.Я застрял.Спасибо!

PS: класс Client () очень простой:

import asyncio
import logging
import sys

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s > %(message)s',
    stream=sys.stderr
)

class Client(asyncio.Protocol):

    def __init__(self):
        self.log = logging.getLogger('client')
        self.address = None
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log.debug('{}:{} connected'.format(*self.address))

    def data_received(self, data):
            self.log.debug('{}:{} just sent {!r}'.format(*self.address, data))

    def eof_received(self):
        self.log.debug('{}:{} sent EOF'.format(*self.address))

    def connection_lost(self, error=""):
        self.log.debug('{}:{} disconnected'.format(*self.address))
        self.transport.close()

1 Ответ

0 голосов
/ 11 июня 2018

Я получаю 2 ошибки: «runtimeError: цикл обработки событий работает».или «asyncio> Задача была уничтожена, но она ожидает выполнения!»

Как вы обнаружили, циклы событий asyncio не вкладывают .

Чтобы удалить вложение,Вы должны определить send_to_peers как сопрограмму, используя async def.Внутри loop.run_until_complete(coro) должно быть изменено на await coro.Если send_to_peers является сопрограммой, вы можете назвать его:

  • из кода блокировки, например infinite, используя loop.run_until_complete(send_to_peers(...))

  • изасинхронный код, такой как infinite_loop с использованием await send_to_peers(...).

В случае infinite_loop вы можете реализовать тайм-аут, используя asyncio.wait_for:

try:
    await asyncio.wait_for(send_to_peers(str(index)), 10)
except asyncio.TimeoutError:
    # ... timeout ...
...