Несколько запросов с aiohttp, но отдельный тайм-аут для каждого запроса - PullRequest
2 голосов
/ 14 июля 2020

У меня огромный список URL-адресов (около 40 миллионов).

Я написал сценарий, который очищает этот URL-адрес с помощью многопоточности. Но мне нужно дополнительное решение, которое должно быть экономичным в ресурсах ОС, поэтому я решил также разработать версию ASYN C.

Я изучал asyncio и aiohttp в Python на неделю.

Ниже рабочий код:

from pathlib import Path
import time
import asyncio
import aiohttp
import pypeln as pl
import async_timeout


# for calculating the total elapsed time
start = time.time()

successful_counter = 0

# files and folders
urlFile = open('url500.txt', 'r')



# list for holding processed url's so far
urlList = []


#######################
# crawler function start
#######################
async def crawling(line, session1):  # function wrapper for parallelizing the process
    # getting URL's from the file
    
    global successful_counter
    
    line = line.strip()  
    
    # try to establish a connection
    try:
        async with async_timeout.timeout(25):
            async with session1.get('http://' + line) as r1:
                x = r1.headers
                if ('audio' in x['Content-Type'] or 'video' in x['Content-Type']):
                    print("Url: " + line + " is a streaming website \n")
                    return  # stream website, skip this website

                # means we have established a connection and got the expected result
                if r1.status // 100 == 2:
                    #print("Returned 2** for the URL:", line)
                    
                    try:
                        text1 = await r1.text()
                        successful_counter += 1

                        '''
                        f1 = open('200/' + line + '.html', 'w')
                        f1.write(text1)
                        f1.close()
                        '''

                    except Exception as exc:
                        print(line + ": " + str(exc))
                        return
                    
                    urlList.append(line)
                    return
                                
                else:
                    return

    # some error occured
    except Exception as exc:
        print("Url: " + line + " created the error: \n" + str(exc))
        return                
            
        
#######################
# crawler function end
#######################


async def main(tempList):

    '''
    limit = 1000
    await pl.task.each(
            crawling, tempList, workers=limit,
        )
    '''
    conn = aiohttp.TCPConnector(limit=0)
    custom_header1 = {'User-agent': 'Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/74.0'}


    #'''
    async with aiohttp.ClientSession(headers=custom_header1, connector=conn) as session1:
        await asyncio.gather(*[asyncio.ensure_future(crawling(url, session1)) for url in tempList])
    #'''

    return


asyncio.run(main(urlFile))

print("total successful: ", successful_counter)

# for calculating the total elapsed time
end = time.time()
print("Total elapsed time in seconds:", end-start)

Вот проблема: когда я не выставляю таймауты, он работает без проблем, но требует слишком много времени. Я хочу потратить не более 25 секунд на запрос. Если веб-сайт не дает мне ответа, я должен пропустить этот веб-сайт и двигаться дальше.

До сих пор все методы, которые я пробовал, не дали мне результата. Когда я где-то устанавливаю тайм-аут 25 секунд, он всегда ограничивает всю программу, а не один запрос. Итак, независимо от того, есть ли у меня файл с 500 URL-адресами или 1000000 URL-адресов, он всегда заканчивается через 25 секунд.

Я пробовал обернуть функцию искателя с помощью async_timeout, используя встроенный тайм-аут aiohttp library

async with session1.get('http://' + line, timeout=25)

Пытался создать сеанс внутри функции искателя и установить тайм-аут для сеанса (опять же с использованием встроенных методов aiohttp).

Ничего не получилось ... Возможно, мне не хватает чего-то огромного, но я застрял на несколько дней, и у меня закончились варианты, чтобы попробовать: D

1 Ответ

0 голосов
/ 16 июля 2020

В качестве отправной точки; Я бы порекомендовал создать небольшой скрипт для проверки минимума, чтобы разрешить получение запроса на тайм-аут, не влияя на другие запросы.

В приведенном ниже коде тайм-аут установлен на полсекунды. Все URL-адреса одинаковы (stackoverflow.com), за исключением одного, который указывает на localhost (который используется для проверки тайм-аута). Также, если URL-адрес - stackoverflow.com, код спит в течение 2 секунд (чтобы показать тайм-аут).

import asyncio
import aiohttp
import json

test_url = "https://stackoverflow.com/"

def Logger(json_message):
    print(json.dumps(json_message))

async def get_data(url):
    Logger({"start": "get_data()", "url": url})
    if url is test_url: #This is a test to make "test url" sleep longer than the timeout.   
        await asyncio.sleep(2) 

    timeout = aiohttp.ClientTimeout(total=0.5) # TODO - timeout after half a second.
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url) as results:            
                Logger({"finish": "get_data()", "url": url})
                return f"{ results.status } - {url}"
    except Exception as exc:
        Logger({"error": "get_data()", "url": url, "message": str(exc) })
        return f"fail - {url}"

async def main():
    urls = [test_url]*5 # create array of 5 urls
    urls[2] = "https://localhost:44344/" # Set third url to something that will timeout (after 0.5 sec).
    statements = [get_data(x) for x in urls]    
    Logger({"start": "gather()"})

    results = await asyncio.gather(*statements) 
    Logger({"finish": "gather()"})
    Logger({"results": ", ".join(results)})

if __name__ == '__main__':
    #asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # Use this to stop "Event loop is closed" error on Windows - https://github.com/encode/httpx/issues/914
    asyncio.run(main())

Вывод:

{"start": "gather()"}
{"start": "get_data()", "url": "https://stackoverflow.com/"}
{"start": "get_data()", "url": "https://stackoverflow.com/"}
{"start": "get_data()", "url": "https://localhost:44344/"}
{"start": "get_data()", "url": "https://stackoverflow.com/"}
{"start": "get_data()", "url": "https://stackoverflow.com/"}
{"error": "get_data()", "url": "https://localhost:44344/", "message": ""}
{"finish": "get_data()", "url": "https://stackoverflow.com/"}
{"finish": "get_data()", "url": "https://stackoverflow.com/"}
{"finish": "get_data()", "url": "https://stackoverflow.com/"}
{"finish": "get_data()", "url": "https://stackoverflow.com/"}
{"finish": "gather()"}
{"results": "200 - https://stackoverflow.com/, 200 - https://stackoverflow.com/, fail - https://localhost:44344/, 200 - https://stackoverflow.com/, 200 - https://stackoverflow.com/"}
...