Gzip распаковать из одного ведра s3 в другое - PullRequest
0 голосов
/ 01 апреля 2020

У меня куча gz файлов в папке на s3. Я пытаюсь разархивировать файлы параллельно и выгрузить разархивированный контент gz в другую корзину s3.

Я пытаюсь использовать многопроцессорность в python, и вот что я пробовал

У меня есть рабочая функция, как показано ниже

def worker_function(objnm,source_bucket,source_folder_location,target_bucket,target_folder_location) :
    filename = objnm.split('/')[-1].replace("GZ.001","txt")
    print("target_file :---" + filename)
    target=target_folder_location + filename
    print("file copying to destination :-- " + target)
    boto3.client('s3').upload_fileobj(Fileobj=gzip.GzipFile(None, 'rb', fileobj=BytesIO(client.get_object(Bucket=source_bucket, Key=key)['Body'].read())), Bucket=target_bucket, Key=target)

Эта функция распаковывает файл gz и записывает его в корзину s3. И тогда у меня есть блок if, как показано ниже

if __name__ == "__main__":

    client=boto3.client('s3') 

    source_bucket=sys.argv[1]
    source_folder_location=sys.argv[2]
    target_bucket=sys.argv[3]
    target_folder_location=sys.argv[4]

    objlist = client.list_objects(Bucket=source_bucket,Prefix=source_folder_location)['Contents']
    for object_summary in objlist:
        key = object_summary["Key"]
        print(key)

        if (".GZ" in key):
            p = multiprocessing.Process(target=worker_function, args=(key, source_bucket, source_folder_location, target_bucket, target_folder_location))
            p.start()

Но если я запускаю этот код, я получаю ошибку ниже

Process Process-16:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File ".py", line 31, in worker_function
    boto3.client('s3').upload_fileobj(Fileobj=gzip.GzipFile(None, 'rb', fileobj=BytesIO(client.get_object(Bucket=source_bucket, Key=key)['Body'].read())), Bucket=target_bucket, Key=target)
  File "/usr/local/lib/python3.6/site-packages/botocore/response.py", line 78, in read
    chunk = self._raw_stream.read(amt)
  File "/usr/local/lib/python3.6/site-packages/urllib3/response.py", line 503, in read
    data = self._fp.read() if not fp_closed else b""
  File "/usr/lib64/python3.6/http/client.py", line 472, in read
    s = self._safe_read(self.length)
  File "/usr/lib64/python3.6/http/client.py", line 627, in _safe_read
    return b"".join(s)
MemoryError

Я пытаюсь понять, что это за ошибка значит и как я могу это решить. Я запускаю это на огромном узле с 95 ядрами. Может кто-нибудь, пожалуйста, помогите мне. Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...