Один процесс продолжает работать (из четырех) при загрузке файла S3, используя загрузку из нескольких частей и многопроцессорность (pool_async) в Python - PullRequest
0 голосов
/ 29 сентября 2019

Я использую многопроцессорную обработку (pool_async) для загрузки нескольких частей файла в S3 с использованием функции многоэтапной загрузки S3.Ниже приведены шаги:

  1. Генерация идентификатора загрузки нескольких частей (mpu_id).
  2. Разделение файла 2 ГБ на куски по 512 МБ и загрузка всех их параллельно с использованием многопроцессорной обработки.
  3. Завершение многоэтапной загрузки.

При выполнении второго шага.Поскольку число процессоров равно 4, большую часть времени 3 процесса завершаются быстро (1-2 минуты), но одному из них требуется больше времени для завершения около 10 - 15 минут.

Ниже приведенпример кода:

############################################################################################################################
## _upload function is being called to upload different parts of file parallely to s3 using apply_async of Multiprocessing##
############################################################################################################################
def _upload(local_path, bucket, key, file_size, mpu_id, part_num, offset, bytes):

    with FileChunkIO(local_path, 'rb', offset=offset,bytes=bytes) as data:
        print(offset,bytes,data)       
        part = client.upload_part(
            Body=data, Bucket=bucket, Key=key, UploadId=mpu_id, PartNumber=part_num)
        logger.info("{0} of {1} uploaded ({2:.3f}%) successfully - Part number : {3} ETag: {4} Status code : {5}".format(
            offset + bytes, file_size,
            as_percent(offset + bytes, file_size),part_num,part['ETag'],part["ResponseMetadata"]["HTTPStatusCode"]))
        if part["ResponseMetadata"]["HTTPStatusCode"] != success_status:
            logging.warning(f"Part upload failed for below:\
            \nPartNumber  : {part_num}\
            \nETag        : {part['ETag']}\
            \nUploadId    : {mpu_id}\
            \nBucket      : {bucket}\
            \nkey         : {key}") 
    return part,part_num

## Callback function to get the result of the multiprcoessing function _upload.
parts = []
def mycallback(x):
    parts.append({"PartNumber": x[1], "ETag": x[0]['ETag']})

class S3MultipartUpload(FileUploads):

    def __init__(self,
                 path,
                 bucket,
                 part_size=file_part_size,
                 profile_name=None,
                 region_name=region,
                 verbose=False):
      self.total_bytes = os.stat(path).st_size
      self.part_bytes = part_size
      FileUploads.__init__(self,path,bucket)

    ## Create function generates mpu id for further processing.
    def create(self):
        logger.info("Executing create function to generate Multi part upload id.")
        try:
            mpu = client.create_multipart_upload(Bucket=self.bucket, Key=self.key)
            mpu_id = mpu["UploadId"]
        except Exception as e:
            logger.error("An error occurred while creating the multipart upload id.", trace=True)
            Errors(e, logger=logger).errorrun()
        logger.info(f"Multipart upload id generated for key {self.key} in bucket {self.bucket} : {mpu_id} ")
        return mpu_id

    ## Upload function divide the file into parts and assign ETag to each part and upload it to S3.
    def upload(self, mpu_id, local_path, file_size,bucket,key):
        logger.info(f"Executing upload function to divide the file into chunks of {self.part_bytes/one_mb_bytes} MB and then uploading it to S3.")
        try:
            logger.info(f"Size of the file : {file_size/one_mb_bytes} MB")
            bytes_per_chunk = file_part_size
            logger.info(f"Size of each part after dividing the file : {bytes_per_chunk/one_mb_bytes} MB")
            chunk_amount = int(math.ceil(file_size / float(bytes_per_chunk)))
            logger.info(f"Number of parts created after dividing the file : {chunk_amount}")
            pool = Pool(processes=None)
            part_num = 1
            results = list()
            logger.info("Parallel execution started for uploading file parts to S3.")
            for i in range(chunk_amount):
                offset = i * bytes_per_chunk
                remaining_bytes = file_size - offset
                bytes = min([bytes_per_chunk, remaining_bytes])
                part_num = i + 1
                r = pool.apply_async(_upload,[local_path,bucket,key,file_size,mpu_id,part_num,offset,bytes],callback=mycallback)
                #print(r.get())
                part_num += 1
            pool.close()
            pool.join()
            logger.info("Parallel execution completed and pool is closed.")
        except Exception as e:
            logger.error("An error occurred while uploading parts to S3.", trace=True)
            Errors(e, logger=logger).errorrun()
        logger.info("Upload process got completed.")
        return parts

    ## Complete function merges all the part or chunk into one file in S3 using mpu_id and ETag.
    def complete(self, mpu_id, parts):
        logger.info("Executing Multipart upload completion function to combine all the parts into one in S3.")
        try:
            parts_sorted = sorted(parts, key = lambda i: (i['PartNumber'], i['ETag']))
            result = client.complete_multipart_upload(
                Bucket=self.bucket,
                Key=self.key,
                UploadId=mpu_id,
                MultipartUpload={"Parts": parts_sorted})
        except Exception as e:
            logger.error("An error occurred while uploading parts to S3.", trace=True)
            Errors(e, logger=logger).errorrun()
        logger.info(f"Checking status code for after completion of multipart upload : {result['ResponseMetadata']['HTTPStatusCode']}")
        return result

    def multi_part_upload(self, local_path,file_size):
        mpu_id = self.create() 
        parts  = self.upload(mpu_id,local_path,file_size,self.bucket,self.key)
        result = self.complete(mpu_id, parts)
        status_code = result["ResponseMetadata"]["HTTPStatusCode"]
        if status_code == success_status:
            logger.info("Multipart upload was closed and completed successfully.")
        else:
            logger.warning("Multipart upload was not completed successfully.")
        return status_code

Вот скриншот с результатом

...