У меня куча gz файлов в папке на s3. Я пытаюсь разархивировать файлы параллельно и выгрузить разархивированный контент gz в другую корзину s3.
Я пытаюсь использовать многопроцессорность в python, и вот что я пробовал
У меня есть рабочая функция, как показано ниже
def worker_function(objnm,source_bucket,source_folder_location,target_bucket,target_folder_location) :
filename = objnm.split('/')[-1].replace("GZ.001","txt")
print("target_file :---" + filename)
target=target_folder_location + filename
print("file copying to destination :-- " + target)
boto3.client('s3').upload_fileobj(Fileobj=gzip.GzipFile(None, 'rb', fileobj=BytesIO(client.get_object(Bucket=source_bucket, Key=key)['Body'].read())), Bucket=target_bucket, Key=target)
Эта функция распаковывает файл gz и записывает его в корзину s3. И тогда у меня есть блок if, как показано ниже
if __name__ == "__main__":
client=boto3.client('s3')
source_bucket=sys.argv[1]
source_folder_location=sys.argv[2]
target_bucket=sys.argv[3]
target_folder_location=sys.argv[4]
objlist = client.list_objects(Bucket=source_bucket,Prefix=source_folder_location)['Contents']
for object_summary in objlist:
key = object_summary["Key"]
print(key)
if (".GZ" in key):
p = multiprocessing.Process(target=worker_function, args=(key, source_bucket, source_folder_location, target_bucket, target_folder_location))
p.start()
Но если я запускаю этот код, я получаю ошибку ниже
Process Process-16:
Traceback (most recent call last):
File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File ".py", line 31, in worker_function
boto3.client('s3').upload_fileobj(Fileobj=gzip.GzipFile(None, 'rb', fileobj=BytesIO(client.get_object(Bucket=source_bucket, Key=key)['Body'].read())), Bucket=target_bucket, Key=target)
File "/usr/local/lib/python3.6/site-packages/botocore/response.py", line 78, in read
chunk = self._raw_stream.read(amt)
File "/usr/local/lib/python3.6/site-packages/urllib3/response.py", line 503, in read
data = self._fp.read() if not fp_closed else b""
File "/usr/lib64/python3.6/http/client.py", line 472, in read
s = self._safe_read(self.length)
File "/usr/lib64/python3.6/http/client.py", line 627, in _safe_read
return b"".join(s)
MemoryError
Я пытаюсь понять, что это за ошибка значит и как я могу это решить. Я запускаю это на огромном узле с 95 ядрами. Может кто-нибудь, пожалуйста, помогите мне. Спасибо