Я нашел способ!
Я понимаю, что проблема не в процессе загрузки.Скорее всего, поскольку запрошенный сервер замедлял мои запросы, использование многопроцессорности имело больше смысла.Я не использовал multiprocess
сразу;эта функция является движущейся частью в конвейере Luigi , поэтому я не был уверен, как я могу использовать многопроцессорность в коде, который уже порождает несколько обработок на одну задачу.
Я попробовал и использовал concurrent.futures
(думаю, только для> 3,6), и результаты были более чем удовлетворительными.Это та же самая функция, что и выше, но с распараллеливанием:
def stream_download_s3_parallel(url,
aws_key,
aws_secret,
aws_bucket_name,
path,
auth,
max_workers=10):
"""
Stream files from request to S3
"""
headers = {'Authorization': f'Bearer {auth}',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1.1 Safari/605.1.15'}
session = boto3.Session(
aws_access_key_id=aws_key,
aws_secret_access_key=aws_secret
)
with requests.Session() as s:
s.headers.update(headers)
try:
with s.get(url) as r:
if r.status_code == requests.codes.ok:
soup = BeautifulSoup(r.content)
download_files = [link.contents[0] for link in
soup.find_all('a') if '.tif' in
link.contents[0]]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(requests_to_s3,
url,
file_name,
aws_bucket_name,
path,
auth,
session): file_name for file_name in download_files}
return future_to_url
Здесь requests_to_s3
- простая функция, которая принимает несколько параметров, необходимых для отправки запроса и загрузки на S3 с использованием smart_open
, в основном то же самоекод в вопросе.concurrent.futures.ThreadPoolExecutor
возвращает генератор всех процессов, запущенных в пуле.Поскольку я сохраняю это непосредственно на S3, играть с этим не имеет смысла, но если это ваш случай, вы можете сделать что-то вроде:
results_process = []
for treat_proc in concurrent.futures.as_completed(future_to_url):
results_process.append(threat_proc.result())
Это добавит любую вашу функциювернуть его в список results_process
.
Я до сих пор не уверен, что предпочитаю этот способ многопроцессорности, а не старой библиотеке multiprocessing
, кажется, cleaner .