У меня проблемы с производительностью при запросе BigQuery и выполнении http-вызовов для каждой строки в ответе BigQuery.
Я пытался выполнять вызовы как можно быстрее, используя aiohttp
, но этобыло бы полезно использовать несколько ядер.
В настоящее время я последовательно извлекаю данные из BigQuery.
Это то, на что похож код.
def generate(rows):
for row in rows:
yield json.dumps(dict(row), default=lambda x: str(x))
async def get(url, row, dag_run_id, bucket):
async with aiohttp.ClientSession() as session:
async with session.post(url,
params={'bucket': bucket,
'dag_run_id': dag_run_id},
json=row) as response:
return await response.read()
rows = bq.query(f'select * from table')
gen = generate(rows)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*[get(url, row, dag_run_id, staging_bucket) for row in gen]))
Я ожидаю, что смогу использовать ресурсы для решения этой проблемы, чтобы улучшить скорость.
Фактические результаты более узкие из-за того, что не используется столько ядер.