Я использую многопроцессорную обработку (pool_async) для загрузки нескольких частей файла в S3 с использованием функции многоэтапной загрузки S3.Ниже приведены шаги:
- Генерация идентификатора загрузки нескольких частей (mpu_id).
- Разделение файла 2 ГБ на куски по 512 МБ и загрузка всех их параллельно с использованием многопроцессорной обработки.
- Завершение многоэтапной загрузки.
При выполнении второго шага.Поскольку число процессоров равно 4, большую часть времени 3 процесса завершаются быстро (1-2 минуты), но одному из них требуется больше времени для завершения около 10 - 15 минут.
Ниже приведенпример кода:
############################################################################################################################
## _upload function is being called to upload different parts of file parallely to s3 using apply_async of Multiprocessing##
############################################################################################################################
def _upload(local_path, bucket, key, file_size, mpu_id, part_num, offset, bytes):
with FileChunkIO(local_path, 'rb', offset=offset,bytes=bytes) as data:
print(offset,bytes,data)
part = client.upload_part(
Body=data, Bucket=bucket, Key=key, UploadId=mpu_id, PartNumber=part_num)
logger.info("{0} of {1} uploaded ({2:.3f}%) successfully - Part number : {3} ETag: {4} Status code : {5}".format(
offset + bytes, file_size,
as_percent(offset + bytes, file_size),part_num,part['ETag'],part["ResponseMetadata"]["HTTPStatusCode"]))
if part["ResponseMetadata"]["HTTPStatusCode"] != success_status:
logging.warning(f"Part upload failed for below:\
\nPartNumber : {part_num}\
\nETag : {part['ETag']}\
\nUploadId : {mpu_id}\
\nBucket : {bucket}\
\nkey : {key}")
return part,part_num
## Callback function to get the result of the multiprcoessing function _upload.
parts = []
def mycallback(x):
parts.append({"PartNumber": x[1], "ETag": x[0]['ETag']})
class S3MultipartUpload(FileUploads):
def __init__(self,
path,
bucket,
part_size=file_part_size,
profile_name=None,
region_name=region,
verbose=False):
self.total_bytes = os.stat(path).st_size
self.part_bytes = part_size
FileUploads.__init__(self,path,bucket)
## Create function generates mpu id for further processing.
def create(self):
logger.info("Executing create function to generate Multi part upload id.")
try:
mpu = client.create_multipart_upload(Bucket=self.bucket, Key=self.key)
mpu_id = mpu["UploadId"]
except Exception as e:
logger.error("An error occurred while creating the multipart upload id.", trace=True)
Errors(e, logger=logger).errorrun()
logger.info(f"Multipart upload id generated for key {self.key} in bucket {self.bucket} : {mpu_id} ")
return mpu_id
## Upload function divide the file into parts and assign ETag to each part and upload it to S3.
def upload(self, mpu_id, local_path, file_size,bucket,key):
logger.info(f"Executing upload function to divide the file into chunks of {self.part_bytes/one_mb_bytes} MB and then uploading it to S3.")
try:
logger.info(f"Size of the file : {file_size/one_mb_bytes} MB")
bytes_per_chunk = file_part_size
logger.info(f"Size of each part after dividing the file : {bytes_per_chunk/one_mb_bytes} MB")
chunk_amount = int(math.ceil(file_size / float(bytes_per_chunk)))
logger.info(f"Number of parts created after dividing the file : {chunk_amount}")
pool = Pool(processes=None)
part_num = 1
results = list()
logger.info("Parallel execution started for uploading file parts to S3.")
for i in range(chunk_amount):
offset = i * bytes_per_chunk
remaining_bytes = file_size - offset
bytes = min([bytes_per_chunk, remaining_bytes])
part_num = i + 1
r = pool.apply_async(_upload,[local_path,bucket,key,file_size,mpu_id,part_num,offset,bytes],callback=mycallback)
#print(r.get())
part_num += 1
pool.close()
pool.join()
logger.info("Parallel execution completed and pool is closed.")
except Exception as e:
logger.error("An error occurred while uploading parts to S3.", trace=True)
Errors(e, logger=logger).errorrun()
logger.info("Upload process got completed.")
return parts
## Complete function merges all the part or chunk into one file in S3 using mpu_id and ETag.
def complete(self, mpu_id, parts):
logger.info("Executing Multipart upload completion function to combine all the parts into one in S3.")
try:
parts_sorted = sorted(parts, key = lambda i: (i['PartNumber'], i['ETag']))
result = client.complete_multipart_upload(
Bucket=self.bucket,
Key=self.key,
UploadId=mpu_id,
MultipartUpload={"Parts": parts_sorted})
except Exception as e:
logger.error("An error occurred while uploading parts to S3.", trace=True)
Errors(e, logger=logger).errorrun()
logger.info(f"Checking status code for after completion of multipart upload : {result['ResponseMetadata']['HTTPStatusCode']}")
return result
def multi_part_upload(self, local_path,file_size):
mpu_id = self.create()
parts = self.upload(mpu_id,local_path,file_size,self.bucket,self.key)
result = self.complete(mpu_id, parts)
status_code = result["ResponseMetadata"]["HTTPStatusCode"]
if status_code == success_status:
logger.info("Multipart upload was closed and completed successfully.")
else:
logger.warning("Multipart upload was not completed successfully.")
return status_code
Вот скриншот с результатом