Просит + s3 мучительно медленно - PullRequest
0 голосов
/ 25 июня 2019

Я использую Python requests с smart_open для загрузки файла (файл .tif, в случае, если это помогает) и загрузки его в корзину S3 без сохранения какого-либо временного файла. Я перебираю несколько тысяч URL-адресов для каждого запроса. Это функция, которую я написал:

def stream_download_s3(url,
                       aws_key,
                       aws_secret,
                       aws_bucket_name,
                       path,
                       auth):
    """
    Stream files from request to S3
    """

    headers = {'Authorization': f'Bearer {auth}',
               'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1.1 Safari/605.1.15'}
    session = boto3.Session(
        aws_access_key_id=aws_key,
        aws_secret_access_key=aws_secret
    )

    bucket_path_strip = path[5:]
    bucket_name_strip = aws_bucket_name[5:]

    with requests.Session() as s:
        s.headers.update(headers)
        try:
            with s.get(url) as r:
                if r.status_code == requests.codes.ok:
                    soup = BeautifulSoup(r.content)
                    download_files = [link.contents[0] for link in
                                      soup.find_all('a') if '.tif' in
                                     link.contents[0]]

                    for file_name in download_files:
                        save_file = os.path.join(path,
                                                 file_name)

                        if check_s3_exists(session, 
                                           bucket_name_strip,
                                           os.path.join(bucket_path_strip, file_name)):
                            print(f'S3: {os.path.join(path, file_name)} already exists. Skipping download')
                        else:
                            with s.get(url + file_name) as file_request:
                                if file_request.status_code == requests.codes.ok:
                                    with smart_open.open(save_file, 'wb', transport_params=dict(session=session)) as so:
                                        so.write(file_request.content)

                else:
                    print(f'Request GET failed with {r.content} [{r.url}]')

        except requests.exceptions.HTTPError as err:
            print(f'{err}')

Эта функция делает первый запрос на очистку всех доступных URL-адресов изображений (это часть bs4), а затем перебирает все найденные URL-адреса и загружает их содержимое. Возвращенное содержимое запроса - это двоичный файл, который я отправляю в smart_open open функцию для загрузки на S3.

Весь процесс занимает ~ 150 минут для 510 изображений (менее 2 Гб), тогда как комбинация wget и aws s3 ls делает то же самое за ~ 86 минут (wget заняла 1h 26m 46s и s3 cp заняло секунды).

Некоторые варианты на рассмотрении:

  • Я работаю на машине с AWS, хотя некоторые API-интерфейсы запрещают использование IP-адресов типа AWS, но это не так. Если это делает загрузку медленнее, я не знаю. Также S3 и EC2 находятся в одном регионе.
  • Я знаю, что stream=True в requests.get() является альтернативой, но, насколько я знаю, это в основном полезно для потоковой передачи больших файлов без заполнения памяти. Это может что-то изменить?
  • Аналогичная реализация с io.BytesIO дает аналогичные результаты. Я что-то там не так делаю?
  • Я использую requests, потому что мне нравится API ( много !), Но если есть какая-либо другая альтернатива, я открыт для попытки :-)

1 Ответ

0 голосов
/ 26 июня 2019

Я нашел способ!

Я понимаю, что проблема не в процессе загрузки.Скорее всего, поскольку запрошенный сервер замедлял мои запросы, использование многопроцессорности имело больше смысла.Я не использовал multiprocess сразу;эта функция является движущейся частью в конвейере Luigi , поэтому я не был уверен, как я могу использовать многопроцессорность в коде, который уже порождает несколько обработок на одну задачу.

Я попробовал и использовал concurrent.futures (думаю, только для> 3,6), и результаты были более чем удовлетворительными.Это та же самая функция, что и выше, но с распараллеливанием:

def stream_download_s3_parallel(url,
                                aws_key,
                                aws_secret,
                                aws_bucket_name,
                                path,
                                auth,
                                max_workers=10):
    """
    Stream files from request to S3
    """

    headers = {'Authorization': f'Bearer {auth}',
               'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1.1 Safari/605.1.15'}
    session = boto3.Session(
        aws_access_key_id=aws_key,
        aws_secret_access_key=aws_secret
    )

    with requests.Session() as s:
        s.headers.update(headers)
        try:
            with s.get(url) as r:
                if r.status_code == requests.codes.ok:
                    soup = BeautifulSoup(r.content)
                    download_files = [link.contents[0] for link in
                                      soup.find_all('a') if '.tif' in
                                     link.contents[0]]

                    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                        future_to_url = {executor.submit(requests_to_s3, 
                                                         url,
                                                         file_name,
                                                         aws_bucket_name,
                                                         path,
                                                         auth,
                                                         session): file_name for file_name in download_files}
                        return future_to_url


Здесь requests_to_s3 - простая функция, которая принимает несколько параметров, необходимых для отправки запроса и загрузки на S3 с использованием smart_open, в основном то же самоекод в вопросе.concurrent.futures.ThreadPoolExecutor возвращает генератор всех процессов, запущенных в пуле.Поскольку я сохраняю это непосредственно на S3, играть с этим не имеет смысла, но если это ваш случай, вы можете сделать что-то вроде:

results_process = []
for treat_proc in concurrent.futures.as_completed(future_to_url):
    results_process.append(threat_proc.result())

Это добавит любую вашу функциювернуть его в список results_process.

Я до сих пор не уверен, что предпочитаю этот способ многопроцессорности, а не старой библиотеке multiprocessing, кажется, cleaner .

...