Python asyncio в рамках многопроцессорной обработки. Один цикл событий на процесс - PullRequest
0 голосов
/ 12 ноября 2018

Я пишу функцию для моей команды, которая будет загружать некоторые данные из облака. Сама функция представляет собой обычную функцию Python, но под капотом она использует asyncio. Итак, я создаю цикл обработки событий в моей функции и параллельно выполняю асинхронные сопрограммы. После загрузки данных я обрабатываю их и возвращаю результаты.

Моя функция работает, как и ожидалось, когда я вызываю ее из любой другой функции Python. Но, когда я пытаюсь распараллелить его с помощью многопроцессорной обработки, я иногда вижу некоторые ошибки ввода-вывода.

Я пытался найти пример того, как этого добиться, но я не смог его найти. Я вижу только рекомендации по использованию concurrent.futures, а для цикла событий run_in_executor выполняется распараллеливание. Это не вариант для меня, потому что я хочу скрыть все асинхронные вещи от своей команды и просто предоставить им эту простую функцию Python, которую они могут вызывать из своего кода (возможно, в многопроцессорной среде). В Интернете я видел аргументы о том, почему это плохая идея и почему я не должен скрывать асинхронные вещи, но в моем случае моя команда не является опытным программистом. Они никогда бы не использовали (или не удосужились понять) asyncio, поэтому простая функция python - это то, что нам лучше всего подходит.

Наконец, вот псевдо-пример, который показывает, что я пытаюсь сделать:

import asyncio
import aiohttp
from typing import List

async def _async_fetch_data(symbol: str) -> bytes:
    '''
    Download stock data for given symbol from yahoo finance.
    '''

    async with asyncio.BoundedSemaphore(50), aiohttp.ClientSession() as session:
        try:
            url = f'https://query1.finance.yahoo.com/v8/finance/chart/{symbol}?symbol={symbol}&period1=0&period2=9999999999&interval=1d'

            async with session.get(url) as response:
                return await response.read()
        except:
            return None

def fetch_data(symbols: List[str]) -> List[bytes]:
    ''' 
    Gateway function that wraps the under the hood async stuff 
    '''

    coroutine_list = [_async_fetch_data(x) for x in symbols]
    if len(coroutine_list) == 0:
        return []

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    data = loop.run_until_complete(asyncio.wait(coroutine_list))[0]
    loop.close()
    return [d.result() for d in data if d.result() is not None]

Это работает нормально, если я запускаю его как

>>> data = fetch_data(['AAPL', 'GOOG'])

Но я боюсь, что все будет хорошо, когда я сделаю

>>> from multiprocessing import Pool as ProcessPool
>>> with ProcessPool(2) as pool:
        data = [j for i in pool.map(fetch_data, [['AAPL', 'GOOG'], ['AMZN', 'MSFT']]) for j in i]

Я вижу случайные ошибки ввода-вывода, но я не могу воспроизвести их, и я не уверен, что это потому, что я смешиваю asyncio с многопроцессорностью или из-за чего-то еще.

...