Объедините aiohttp с многопроцессорностью - PullRequest
0 голосов
/ 29 декабря 2018

Я делаю скрипт, который получает HTML-код почти 20 000 страниц и анализирует его, чтобы получить только его часть.

Мне удалось получить содержимое в 20 000 страниц в кадре данных с асинхронными запросами, используя asyncio и aiohttp, но этот сценарий все еще ждет, пока все страницы будут выбраны для их анализа.

async def get_request(session, url, params=None):
    async with session.get(url, headers=HEADERS, params=params) as response:
        return await response.text()


async def get_html_from_url(urls):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            tasks.append(get_request(session, url))
        html_page_response = await asyncio.gather(*tasks)
    return html_page_response


html_pages_list = asyncio_loop.run_until_complete(get_html_from_url(urls))

Получив содержимое каждой страницы, мне удалось использовать пул многопроцессорной обработки для распараллеливания синтаксического анализа.

get_whatiwant_from_html(html_content):

    parsed_html = BeautifulSoup(html_content, "html.parser")
    clean = parsed_html.find("div", class_="class").get_text()

    # Some re.subs
    clean = re.sub("", "", clean)
    clean = re.sub("", "", clean)
    clean = re.sub("", "", clean)  

    return clean


pool = Pool(4)
what_i_want = pool.map(get_whatiwant_from_html, html_content_list)

Этот код асинхронно смешивает выборку и синтаксический анализ, но мне бы хотелосьинтегрировать в него многопроцессорность:

async def process(url, session):
    html = await getRequest(session, url)
    return await get_whatiwant_from_html(html)

async def dispatch(urls):
    async with aiohttp.ClientSession() as session:
        coros = (process(url, session) for url in urls)
        return await asyncio.gather(*coros)

result = asyncio.get_event_loop().run_until_complete(dispatch(urls))

Есть ли очевидный способ сделать это?Я думал о создании 4 процессов, каждый из которых выполняет асинхронные вызовы, но реализация выглядит немного сложной, и мне интересно, есть ли другой способ.

Я очень новичок в asyncio и aiohttp, так что если у вас есть что-то, чтобыпосоветуйте мне почитать, чтобы лучше понять, я буду очень счастлив.

Ответы [ 2 ]

0 голосов
/ 03 января 2019

Вы можете использовать ProcessPoolExecutor .

С run_in_executor вы можете выполнять IO в вашем основном асинхронном процессе.

Но ваши тяжелые вычисления ЦП вотдельные процессы.

async def get_data(session, url, params=None):
    loop = asyncio.get_event_loop()
    async with session.get(url, headers=HEADERS, params=params) as response:
        html = await response.text()
        data = await loop.run_in_executor(None, partial(get_whatiwant_from_html, html))
        return data

async def get_data_from_urls(urls):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            tasks.append(get_data(session, url))
        result_data = await asyncio.gather(*tasks)
    return result_data

executor = concurrent.futures.ProcessPoolExecutor(max_workers=10)
asyncio_loop.set_default_executor(executor)
results = asyncio_loop.run_until_complete(get_data_from_urls(urls))
0 голосов
/ 29 декабря 2018

Вы можете увеличить скорость анализа, изменив парсер BeautifulSoup с html.parser на lxml , который является самым быстрым, за которым следует html5lib.html .parser - самый медленный из них.

Ваше узкое место - не проблема обработки, а IO.Вам может потребоваться несколько потоков, а не процесс:

Например, вот программа-шаблон, которая очищает и спит, чтобы сделать ее медленной, но работает в нескольких потоках и, таким образом, быстрее завершить задачу.

from concurrent.futures import ThreadPoolExecutor
import random,time
from bs4 import BeautifulSoup as bs
import requests

URL = 'http://quotesondesign.com/wp-json/posts'

def quote_stream():
    '''
    Quoter streamer
    '''
    param = dict(page=random.randint(1, 1000))
    quo = requests.get(URL, params=param)

    if quo.ok:
        data = quo.json()
        author = data[0]['title'].strip()

        content = bs(data[0]['content'], 'html5lib').text.strip()

        print(f'{content}\n-{author}\n')

    else:
        print('Connection Issues :(')

def multi_qouter(workers=4):
    with ThreadPoolExecutor(max_workers=workers) as executor:
        _ = [executor.submit(quote_stream) for i in range(workers)]

if __name__ == '__main__':
    now = time.time()

    multi_qouter(workers=4)

    print(f'Time taken {time.time()-now:.2f} seconds')

В вашем случае создайте функцию, которая выполняет задание, которое вы хотите от звездного до конца.Эта функция будет принимать URL и необходимые параметры в качестве аргументов.После этого создайте другую функцию, которая вызывает предыдущую функцию в разных потоках, каждый из которых имеет свой URL.Поэтому вместо i в диапазоне (..) для URL в URL.Вы можете запускать 2000 потоков одновременно, но я бы предпочел куски, скажем, 200, работающие параллельно.

...