Получить JSON, используя Python и AsyncIO - PullRequest
0 голосов
/ 08 ноября 2018

Не так давно я начал учиться асинчо.И я столкнулся с проблемой.Мой код не заканчивается.Я не могу понять это.Помоги мне, пожалуйста!

import signal
import sys
import asyncio
import aiohttp
import json

loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=loop)

async def get_json(client, url):
    async with client.get(url) as response:
        assert response.status == 200
        return await response.read()

async def get_reddit_cont(subreddit, client):
    data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=50')

    jn = json.loads(data1.decode('utf-8'))

    print('DONE:', subreddit)

def signal_handler(signal, frame):
    loop.stop()
    client.close()
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

for key in {'python':1, 'programming':2, 'compsci':3}:
    asyncio.ensure_future(get_reddit_cont(key, client))
loop.run_forever()

Результат:

DONE: compsci  
DONE: programming  
DONE: python  
...

Я пытался что-то сделать, но результат был нестабильным.

future = []
for key in {'python':1, 'programming':2, 'compsci':3}:
    future=asyncio.ensure_future(get_reddit_cont(key, client))
loop.run_until_complete(future)

Результат (1 задание вместо 3):

DONE: compsci  
[Finished in 1.5s]  

Я решил свой вопрос следующим образом:

Добавлено:

async with aiohttp.ClientSession () as a client:

AT:

async def get_reddit_cont (subreddit, client):  

И:

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    futures = [get_reddit_cont(subreddit,client) for subreddit in range(1,6)]
    result = loop.run_until_complete(asyncio.gather(*futures))

Но когда код завершен, я получаю сообщение:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x034021F0>
[Finished in 1.0s]

Iне понимаю, почему это происходит.

Но когда я пытаюсь выполнить "для ключа" около 60 или более раз, я получаю ошибку:

...
aiohttp.client_exceptions.ClientOSError: [WinError10054] Удаленный хост принудительно прервал существующее соединение

Ответы [ 3 ]

0 голосов
/ 08 ноября 2018

Вот несколько предлагаемых изменений с контекстом в комментариях.

Если у вас нет уникального варианта использования или вы просто экспериментируете ради обучения, вероятно, не должно быть оснований для использования signal - asyncio с функциями верхнего уровня, которые позволяют вам решить, когда закрыть и завершить цикл обработки событий.

import asyncio
import logging
import sys

import aiohttp

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
                    format='%(asctime)s:%(message)s')

URL = 'https://www.reddit.com/r/{subreddit}/top.json?sort=top&t=day&limit=50'


async def get_json(client: aiohttp.ClientSession, url: str) -> dict:
    # If you're going to be making repeated requests, use this
    # over .get(), which is just a wrapper around `.request()` and
    # involves an unneeded lookup
    async with client.request('GET', url) as response:

        # Raise if the response code is >= 400.
        # Some 200 codes may still be "ok".
        # You can also pass raise_for_status within
        # client.request().
        response.raise_for_status()

        # Let your code be fully async.  The call to json.loads()
        # is blocking and won't take full advantage.
        #
        # And it does largely the same thing you're doing now:
        # https://github.com/aio-libs/aiohttp/blob/76268e31630bb8615999ec40984706745f7f82d1/aiohttp/client_reqrep.py#L985
        j = await response.json()
        logging.info('DONE: got %s, size %s', url, j.__sizeof__())
        return j


async def get_reddit_cont(keys, **kwargs) -> list:
    async with aiohttp.ClientSession(**kwargs) as session:
        # Use a single session as a context manager.
        # this enables connection pooling, which matters a lot when
        # you're only talking to one site
        tasks = []
        for key in keys:
            # create_task: Python 3.7+
            task = asyncio.create_task(
                get_json(session, URL.format(subreddit=key)))
            tasks.append(task)
        # The result of this will be a list of dictionaries
        # It will only return when all of your subreddits
        # have given you a response & been decoded
        #
        # To process greedily, use asyncio.as_completed()
        return await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == '__main__':
    default = ('python', 'programming', 'compsci')
    keys = sys.argv[1:] if len(sys.argv) > 1 else default
    sys.exit(asyncio.run(get_reddit_cont(keys=keys)))

Выход:

$ python3 asyncreddit.py 
2018-11-07 21:44:49,495:Using selector: KqueueSelector
2018-11-07 21:44:49,653:DONE: got https://www.reddit.com/r/compsci/top.json?sort=top&t=day&limit=50, size 216
2018-11-07 21:44:49,713:DONE: got https://www.reddit.com/r/python/top.json?sort=top&t=day&limit=50, size 216
2018-11-07 21:44:49,947:DONE: got https://www.reddit.com/r/programming/top.json?sort=top&t=day&limit=50, size 216

Редактировать: из вашего вопроса:

Но когда код завершен, я получаю сообщение: Unclosed client session

Это потому, что вам нужно .close() client объект, так же, как вы бы файловый объект. Вы можете сделать это двумя способами:

  • Назовите это явно: client.close(). Безопаснее обернуть это в блок try / finally, чтобы убедиться, что он закрыт независимо от того, что
  • Или (более простой способ) использовать клиент в качестве асинхронного диспетчера контекста, как в этом ответе. Это означает, что после окончания блока async with сеанс автоматически закрывается с помощью метода .__aexit__().

connector является базовым TCPConnector, который является атрибутом сеанса. Он обрабатывает пул соединений, и это то, что в конечном итоге остается открытым в вашем коде.

0 голосов
/ 09 ноября 2018

Я решил проблему следующим образом:

import asyncio
import aiohttp
import json

async def get_json(client, url):
    async with client.get(url) as response:
        assert response.status == 200
        return await response.read()

async def get_reddit_cont(subreddit):
    async with aiohttp.ClientSession(loop=loop) as client:
        data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=50')

        jn = json.loads(data1.decode('utf-8'))

        print('DONE:', subreddit)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    futures = [get_reddit_cont(subreddit) for subreddit in {'python':1, 'programming':2, 'compsci':3}]
    result = loop.run_until_complete(asyncio.gather(*futures))
0 голосов
/ 08 ноября 2018

Ответ лежит в вашем коде. Вот подсказка loop.run_forever(). Так что вам нужно будет позвонить loop.stop(). Я бы использовал условие, такое как предложение if или цикл while.

if we_have_what_we_need:
    signal_handler(signal, frame)

или

while we_dont_have_what_we_need:
    loop.forever()

Первый остановит ваш код при выполнении условия. Последний будет продолжать работать, пока не будет выполнено условие.

[UPDATE]

Мы также можем использовать;

(Python Docs)

loop.run_until_complete(future)

Запуск до тех пор, пока будущее (экземпляр Future) не завершится.

Если аргумент является объектом сопрограммы, он неявно запланирован на запустить как asyncio.Task.

Вернуть результат будущего или вызвать исключение.

loop.run_forever()

Запуск цикла обработки событий до вызова stop ().

...