Нужна помощь с приведенным ниже кодом, который я тестирую для параллельной распаковки файлов .gz в S3 и их обратной загрузки, чтобы я мог распаковать и загрузить их одновременно. Цель состоит в том, чтобы сократить время выполнения за счет параллельной распаковки файлов в указанном месте c S3, что в настоящее время выполняется последовательно с использованием того же метода распаковки.
import time
import random
import os
from multiprocessing import Queue
import json
from pathlib import Path, PurePath
import subprocess
import gzip
import shutil
import boto3
import datetime
from io import BytesIO
import re
import os
import subprocess
import sys
from multiprocessing import Process
bucket_name = "bucket_1"
bucket_subpath = "customer"
files_subpath = "customer_files"
search_string = "^" + str(bucket_subpath) + "/" + str(files_subpath) + "/.*gz"
prefix = str(bucket_subpath) + "/" + str(files_subpath) + "/"
q = Queue()
## function to uncompress in parallel
def hello(keyString,uncomp_keyString):
q.put(os.getpid())
print(f"ProcessID: {q.get()}")
print(f"file: {keyString}")
## getting the object details
response = client.get_object(Bucket=bucket_name, Key=keyString)
print(f"response: {response}")
response1 = response['Body'].read()
print(f"response1: {keyString}")
Fileobj = gzip.GzipFile(None,'rb',fileobj=BytesIO(response1))
print(f"Fileobj: {Fileobj}")
print(f"Uploading to s3: {Fileobj}")
client.upload_fileobj(Fileobj=Fileobj,Bucket=bucket_name,Key=uncomp_keyString)
processes = [ ]
client = boto3.client('s3', use_ssl=False)
paginator = client.get_paginator('list_objects_v2')
result = paginator.paginate(Bucket='data-spliter',Delimiter='/',Prefix=prefix)
for page in result:
if "Contents" in page:
for key in page[ "Contents" ]:
keyString = key[ "Key" ]
print(f"Original file: {keyString}")
## searching for only .gz files in path and passing them for uncompression
x = re.search(search_string, keyString)
if x:
## matched
print(f"filtered file: {keyString}")
uncomp_keyString = keyString.replace(".gz", ".txt")
print("STARTED Uncompressing file- " + str(keyString))
t = Process(target=hello, args=(keyString,uncomp_keyString))
print("process_append")
processes.append(t)
print("process_start")
t.start()
for one_process in processes:
one_process.join()
mylist = [ ]
while not q.empty():
mylist.append(q.get())
print("Done!")
print(len(mylist))
print(mylist)
В качестве отправной точки у меня есть два файла .gz в расположении S3 .
Иногда при выполнении первого файла происходит сбой и ошибка, как показано ниже, и успешное сжатие второго файла.
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "parallel_decomp.py", line 52, in hello
client.upload_fileobj(Fileobj=Fileobj,Bucket=bucket_name,Key=uncomp_keyString)
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/boto3/s3/inject.py", line 539, in upload_fileobj
return future.result()
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/futures.py", line 106, in result
return self._coordinator.result()
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/futures.py", line 265, in result
raise self._exception
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/tasks.py", line 255, in _main
self._submit(transfer_future=transfer_future, **kwargs)
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/upload.py", line 549, in _submit
upload_input_manager.provide_transfer_size(transfer_future)
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/upload.py", line 324, in provide_transfer_size
fileobj.seek(0, 2)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 379, in seek
return self._buffer.seek(offset, whence)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_compression.py", line 129, in seek
while self.read(io.DEFAULT_BUFFER_SIZE):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 474, in read
if not self._read_gzip_header():
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 422, in _read_gzip_header
raise OSError('Not a gzipped file (%r)' % magic)
OSError: Not a gzipped file (b'\x05i')
В других случаях выполнение второго файла не выполняется и возникает ошибка, как показано ниже, и сначала выполняется сжатие. файл успешно.
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "parallel_decomp.py", line 51, in hello
client.upload_fileobj(Fileobj=Fileobj,Bucket=bucket_name,Key=uncomp_keyString)
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/boto3/s3/inject.py", line 539, in upload_fileobj
return future.result()
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/futures.py", line 106, in result
return self._coordinator.result()
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/futures.py", line 265, in result
raise self._exception
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/tasks.py", line 255, in _main
self._submit(transfer_future=transfer_future, **kwargs)
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/upload.py", line 549, in _submit
upload_input_manager.provide_transfer_size(transfer_future)
File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/upload.py", line 324, in provide_transfer_size
fileobj.seek(0, 2)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 379, in seek
return self._buffer.seek(offset, whence)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_compression.py", line 129, in seek
while self.read(io.DEFAULT_BUFFER_SIZE):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 482, in read
uncompress = self._decompressor.decompress(buf, size)
zlib.error: Error -3 while decompressing data: invalid distance too far back
Нет ни одного выполнения после успешного завершения. Я всегда получаю одну из двух ошибок.
Любая помощь приветствуется. Заранее большое спасибо.