ProactorEventLoop - ValueError: аргумент цикла должен согласовываться с Future - PullRequest
1 голос
/ 06 апреля 2019

Я использую этот асинхронный проект (называемый Broker - см. Git ) со следующим кодом:

   proxies = asyncio.Queue()
   broker = Broker(proxies)
   tasks = asyncio.gather(broker.find(types = ['HTTPS'], strict = True, limit = 10),
                               self.storeProxies(proxies))
   loop = asyncio.get_event_loop()
   loop.run_until_complete(tasks)

, где self.storeProxies - это асинхронная функция, которая содержит

while True:
    proxy = await proxies.get()
    if proxy is None: break
    self.proxies['http://{0}:{1}'.format(proxy.host, proxy.port)] = 1

Однако я не думаю, что это связано с частью Broker (конечно, не уверен).

Каждый раз, когда я запускаю этот код, после довольно случайного числа успехов он завершается с [WinError 10038] An operation was attempted on something that is not a socket.Затем я попытался провести какое-то исследование и в итоге получил этот ответ .Но, пытаясь с этим кодом, я получаю эту ошибку:

ValueError: loop argument must agree with Future

Есть идеи, как с этим бороться?

Подробности, которые могут быть полезны:

ОС: Windows 10 (версия 1809) Версия Python: 3.7.1

Основано на ответе Михаила Герасимова

async def storeProxies(self, proxies):
    logger.info("IN IT!")
    while True:
        proxy = await proxies.get()
        if proxy is None: break
        self.proxies['http://{0}:{1}'.format(proxy.host, proxy.port)] = 1

async def getfetching(self, amount):
    from proxybroker import Broker

    proxies = asyncio.Queue()
    broker = Broker(proxies)
    return await asyncio.gather(
        broker.find(
            types = ['HTTP', 'HTTPS'],
            strict = True,
            limit = 10
        ),
        self.storeProxies(proxies)
    )
def fetchProxies(self, amount):
    if os.name == 'nt':
        loop = asyncio.SelectorEventLoop() # for subprocess' pipes on Windows
        asyncio.set_event_loop(loop)
    else:
        loop = asyncio.get_event_loop()
    loop.run_until_complete(self.getfetching(amount))
    logger.info("FETCHING COMPLETE!!!!!!")
    logger.info('Proxies: {}'.format(self.proxies))

, где fetchProxies вызывается через определенные промежутки времени отдругое место.Это отлично работает в первый раз, но затем «терпит неудачу» с предупреждениями:

2019-04-06 21:04:21 [py.warnings] WARNING: d:\users\me\anaconda3\lib\site-packages\proxybroker\providers.py:78: DeprecationWarning: The object should be created from async function
  headers=get_headers(), cookies=self._cookies, loop=self._loop

2019-04-06 21:04:21 [py.warnings] WARNING: d:\users\me\anaconda3\lib\site-packages\aiohttp\connector.py:730: DeprecationWarning: The object should be created from async function
  loop=loop)

2019-04-06 21:04:21 [py.warnings] WARNING: d:\users\me\anaconda3\lib\site-packages\aiohttp\cookiejar.py:55: DeprecationWarning: The object should be created from async function
  super().__init__(loop=loop)

с последующим поведением, которое выглядит как бесконечный цикл (жестко застрявший ничего не выводит).Примечательно, что это также произошло со мной в примере с Герасимовым (в комментариях) при глобальном импорте Broker.Наконец я начинаю видеть свет туннеля.

1 Ответ

0 голосов
/ 06 апреля 2019

Взгляните на эту часть:

proxies = asyncio.Queue()

# ...

loop.run_until_complete(tasks)

asyncio.Queue(), когда созданный привязан к текущему циклу событий (цикл событий по умолчанию или один был установлен в текущий момент с помощью asyncio.set_event_loop () ).Обычно только объект цикла связан с этим.Если вы измените текущий цикл, вы должны воссоздать объект.То же самое верно для многих других asyncio объектов.

Чтобы убедиться, что каждый объект связан с новым циклом событий, лучше создавать связанные с asyncio объекты уже после того, как вы установили и запустили новый цикл событий.Это будет выглядеть так:

async def main():
    proxies = asyncio.Queue()
    broker = Broker(proxies)
    return await asyncio.gather(
        broker.find(
            types = ['HTTPS'], 
            strict = True, 
            limit = amount
        ),
        self.storeProxies(proxies)
    )


if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Иногда (редко) вам придется помещать некоторые импортные данные либо внутри main().


Upd:

Проблема возникает из-за того, что вы изменяете цикл событий между fetchProxies вызовами, но Broker импортируется только один раз (Python кэширует импортированные модули).

Перезагрузка Broker не работает для меняпоэтому я не смог найти лучшего способа, чем повторно использовать цикл событий, который вы установили один раз.

Замените этот код

if os.name == 'nt':
    loop = asyncio.SelectorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

на этот

if os.name == 'nt':
    loop = asyncio.get_event_loop()
    if not isinstance(loop, asyncio.SelectorEventLoop):
        loop = asyncio.SelectorEventLoop() # for subprocess' pipes on Windows
        asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

PS

Кстати, вам не нужно делать это в первую очередь:

if os.name == 'nt':
    loop = asyncio.SelectorEventLoop()
    asyncio.set_event_loop(loop)

Windows уже использует SelectorEventLoop по умолчанию, и она не поддерживаеттрубы ( документ ).

...