Я пытаюсь написать программу, используя asyncio
, и был ориентирован на этот блог пост.То, что я пытаюсь сделать, это получить некоторые данные JSON одновременно.Для одного входного фрейма данных.однако я хотел бы обработать запрошенные данные дальше, как только они станут доступны.
Таким образом, в основном есть две группы задач:
- обрабатывает данные в df1 одновременно и делает некоторыеcalc как только JSON вернул
- обрабатывать данные в df2 одновременно
Они более или менее независимы друг от друга, но я хочу также запустить группу задач одновременно.Как только обе группы задач будут завершены, я хочу продолжить их обработку.
Мой вопрос заключается в том, правильно ли разработана моя реализация в терминах шаблонов asyncio
, где я только что использовал два оператора сбора?Или это неправильная концепция?Вот скетч:
import asyncio
import aiohttp
from aiohttp import ClientSession
async def fetch_json(url: str, session: ClientSession, data: json.dumps) -> Dict:
resp = await session.get(url=url, headers={"content-type": "application/json"}, data=data)
resp.raise_for_status()
logger.info("Got response [%s] for URL: %s", resp.status, url)
json = await resp.json()
return json
async def some_calc(url: str, session: ClientSession, data: json.dumps):
res = await fetch_json(url=url, session=session, data=data)
return [float(x) for x in res]
async def process_data(df: Dict, url: str, session: ClientSession):
async with session:
tasks = []
for data in df:
try:
if df1:
task = some_calc(url=url, session=session, data=data)
else:
task = fetch_json(url=url, session=session, data=data)
except Exception as e:
# ...
tasks.append(
task
)
res = await asyncio.gather(*tasks)
return res
async def bulk_execute(df1, df2):
url = "http://some.url/"
async with ClientSession() as session:
res = await asyncio.gather(process_data(df1, url, session), process_data(df2, url, session))
return res
if __name__ == "__main__":
res = asyncio.run(bulk_execute(df1, df2))