Цель состоит в том, чтобы попытаться загрузить большое количество «больших» jsons из s3. Я нашел aiobotocore
и почувствовал необходимость попробовать в надежде добиться большей эффективности и в то же время ознакомиться с asyncio
. Я дал ему шанс, и это работает, но я знаю в основном нада об асинхронном программировании. Поэтому я надеялся на некоторые улучшения / комментарии. Может быть, есть какие-то добрые души, которые могут заметить некоторые очевидные ошибки.
Проблема в том, что boto3 поддерживает только один http-запрос за раз. Используя Threadpool
, мне удалось добиться значительных улучшений, но я надеюсь на более эффективный путь.
Вот код:
Импорт:
import os
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE
Какой-то вспомогательный генератор, который я нашел где-то, чтобы возвращать декодированные jsons из строки с несколькими jsons.
def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
'''helper for parsing individual jsons from string of jsons (stolen from somewhere)'''
string = str(string_or_fp)
decoder = cls(**kwargs)
idx = WHITESPACE.match(string, 0).end()
while idx < len(string):
obj, end = decoder.raw_decode(string, idx)
yield obj
idx = WHITESPACE.match(string, end).end()
Эта функция получает ключи из корзины s3 с заданным префиксом:
# Async stuff starts here
async def get_keys(loop, bucket, prefix):
'''Get keys in bucket based on prefix'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = []
# list s3 objects using paginator
paginator = client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
for c in result.get('Contents', []):
keys.append(c['Key'])
return keys
Эта функция получает содержимое для предоставленного ключа. Неоправданно, что он выравнивает список декодируемого контента:
async def get_object(loop,bucket, key):
'''Get json content from s3 object'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
# get object from s3
response = await client.get_object(Bucket=bucket, Key=key)
async with response['Body'] as stream:
content = await stream.read()
return list(iterload(content.decode()))
Вот основная функция, которая собирает содержимое для всех найденных ключей и выравнивает список содержимого.
async def go(loop, bucket, prefix):
'''Returns list of dicts of object contents'''
session = aiobotocore.get_session(loop=loop)
async with session.create_client('s3', region_name='us-west-2',
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
keys = await get_keys(loop, bucket, prefix)
contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])
return list(chain.from_iterable(contents))
Наконец, я запускаю это, и список результатов диктовок заканчивается в result
loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))
Одна вещь, которую я думаю, может быть немного странной, это то, что я создаю клиента в каждой асинхронной функции. Вероятно, это можно поднять. Обратите внимание, что aiobotocore
работает с несколькими клиентами.
Кроме того, я думаю, что вам не нужно ждать, пока все ключи будут загружены, прежде чем загружать объекты для ключей, что, как мне кажется, имеет место в этой реализации. Я предполагаю, что как только ключ будет найден, вы можете позвонить get_object
. Так что, может быть, это должно быть async generator
. Но я не совсем ясно здесь.
Заранее спасибо! Надеюсь, это поможет кому-то в подобной ситуации.