Чтение нескольких «больших» jsons из s3 асинхронно.Есть ли способ лучше? - PullRequest
0 голосов
/ 19 апреля 2019

Цель состоит в том, чтобы попытаться загрузить большое количество «больших» jsons из s3. Я нашел aiobotocore и почувствовал необходимость попробовать в надежде добиться большей эффективности и в то же время ознакомиться с asyncio. Я дал ему шанс, и это работает, но я знаю в основном нада об асинхронном программировании. Поэтому я надеялся на некоторые улучшения / комментарии. Может быть, есть какие-то добрые души, которые могут заметить некоторые очевидные ошибки.

Проблема в том, что boto3 поддерживает только один http-запрос за раз. Используя Threadpool, мне удалось добиться значительных улучшений, но я надеюсь на более эффективный путь.

Вот код:

Импорт:

import os 
import asyncio
import aiobotocore
from itertools import chain
import json
from json.decoder import WHITESPACE

Какой-то вспомогательный генератор, который я нашел где-то, чтобы возвращать декодированные jsons из строки с несколькими jsons.

def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    '''helper for parsing individual jsons from string of jsons (stolen from somewhere)'''
    string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()

Эта функция получает ключи из корзины s3 с заданным префиксом:

# Async stuff starts here
async def get_keys(loop, bucket, prefix):
    '''Get keys in bucket based on prefix'''

    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:
        keys = []
        # list s3 objects using paginator
        paginator = client.get_paginator('list_objects')
        async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for c in result.get('Contents', []):
                keys.append(c['Key'])
        return keys

Эта функция получает содержимое для предоставленного ключа. Неоправданно, что он выравнивает список декодируемого контента:

async def get_object(loop,bucket, key):
    '''Get json content from s3 object'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:


        # get object from s3
        response = await client.get_object(Bucket=bucket, Key=key)
        async with response['Body'] as stream:
            content = await stream.read()    

    return list(iterload(content.decode()))       

Вот основная функция, которая собирает содержимое для всех найденных ключей и выравнивает список содержимого.

async def go(loop, bucket, prefix):
    '''Returns list of dicts of object contents'''
    session = aiobotocore.get_session(loop=loop)
    async with session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID) as client:

        keys = await get_keys(loop, bucket, prefix)

        contents = await asyncio.gather(*[get_object(loop, bucket, k) for k in keys])     

        return list(chain.from_iterable(contents))

Наконец, я запускаю это, и список результатов диктовок заканчивается в result

loop = asyncio.get_event_loop()
result = loop.run_until_complete(go(loop, 'some-bucket', 'some-prefix'))
  • Одна вещь, которую я думаю, может быть немного странной, это то, что я создаю клиента в каждой асинхронной функции. Вероятно, это можно поднять. Обратите внимание, что aiobotocore работает с несколькими клиентами.

  • Кроме того, я думаю, что вам не нужно ждать, пока все ключи будут загружены, прежде чем загружать объекты для ключей, что, как мне кажется, имеет место в этой реализации. Я предполагаю, что как только ключ будет найден, вы можете позвонить get_object. Так что, может быть, это должно быть async generator. Но я не совсем ясно здесь.

Заранее спасибо! Надеюсь, это поможет кому-то в подобной ситуации.

1 Ответ

1 голос
/ 21 апреля 2019

первая проверка aioboto3

секунда, каждый клиент в aiobotocore связан с сеансом aiohttp.Каждый сеанс может иметь до max_pool_connections .Вот почему в примере basic aiobotocore он делает async with на create_client.Поэтому пул закрывается после завершения работы с клиентом.

Вот несколько советов:

  1. Вам следует использовать рабочий пул, созданный мной, модульный CaliDog чтобы не загрязнять ваш цикл событий.Когда вы используете это, думайте о своем рабочем процессе как о потоке.
  2. Это избавит вас от необходимости использовать asyncio.gather, который оставит задачи в фоновом режиме после появления первого исключения.
  3. Выследует настроить размер рабочего цикла и max_pool_connections вместе, и использовать только один клиент с количеством задач, которые вы хотите (или можете на основе требуемых вычислений) поддерживать параллельно.
  4. Вам действительно не нужно передаватьцикл вокруг, как в современных версиях Python, есть один цикл на поток
  5. Вы должны использовать профили aws (параметр профиля для параметра Session init) / переменные окружения , чтобы вам не нужно было жестко кодировать ключ и регионинформация.

Исходя из вышеизложенного, вот как я бы это сделал:

import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial

# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config

_NUM_WORKERS = 50


def iterload(string_or_fp, cls=json.JSONDecoder, **kwargs):
    # helper for parsing individual jsons from string of jsons (stolen from somewhere)
    string = str(string_or_fp)

    decoder = cls(**kwargs)
    idx = WHITESPACE.match(string, 0).end()
    while idx < len(string):
        obj, end = decoder.raw_decode(string, idx)
        yield obj
        idx = WHITESPACE.match(string, end).end()


async def get_object(s3_client, bucket: str, key: str):
    # Get json content from s3 object

    # get object from s3
    response = await s3_client.get_object(Bucket=bucket, Key=key)
    async with response['Body'] as stream:
        content = await stream.read()

    return list(iterload(content.decode()))


async def go(bucket: str, prefix: str) -> List[dict]:
    """
    Returns list of dicts of object contents

    :param bucket: s3 bucket
    :param prefix: s3 bucket prefix
    :return: list of dicts of object contents
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()

    session = aiobotocore.session.AioSession()
    config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
    contents = []
    async with session.create_client('s3', config=config) as client:
        worker_co = partial(get_object, client, bucket)
        async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
                                       return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
            # list s3 objects using paginator
            paginator = client.get_paginator('list_objects')
            async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
                for c in result.get('Contents', []):
                    contents.append(await work_pool.push(c['Key']))

    # retrieve results from futures
    contents = [c.result() for c in contents]
    return list(chain.from_iterable(contents))


_loop = asyncio.get_event_loop()
_result = _loop.run_until_complete(go('some-bucket', 'some-prefix'))
...