Проблема с распаковкой файлов .gz в S3 и их загрузкой обратно с многопроцессорной обработкой - PullRequest
0 голосов
/ 07 августа 2020

Нужна помощь с приведенным ниже кодом, который я тестирую для параллельной распаковки файлов .gz в S3 и их обратной загрузки, чтобы я мог распаковать и загрузить их одновременно. Цель состоит в том, чтобы сократить время выполнения за счет параллельной распаковки файлов в указанном месте c S3, что в настоящее время выполняется последовательно с использованием того же метода распаковки.

import time
import random
import os
from multiprocessing import Queue
import json
from pathlib import Path, PurePath
import subprocess
import gzip
import shutil
import boto3
import datetime
from io import BytesIO
import re
import os
import subprocess
import sys
from multiprocessing import Process


bucket_name = "bucket_1"
bucket_subpath = "customer"
files_subpath = "customer_files"

search_string = "^" + str(bucket_subpath) + "/" + str(files_subpath) + "/.*gz"
prefix = str(bucket_subpath) + "/" + str(files_subpath) + "/"

q = Queue()
## function to uncompress in parallel
def hello(keyString,uncomp_keyString):
    q.put(os.getpid())
    print(f"ProcessID: {q.get()}")
    print(f"file: {keyString}")
   ## getting the object details
    response = client.get_object(Bucket=bucket_name, Key=keyString)
    print(f"response: {response}")
    response1 = response['Body'].read()
    print(f"response1: {keyString}")
    Fileobj = gzip.GzipFile(None,'rb',fileobj=BytesIO(response1))
    print(f"Fileobj: {Fileobj}")
    print(f"Uploading to s3: {Fileobj}")
    client.upload_fileobj(Fileobj=Fileobj,Bucket=bucket_name,Key=uncomp_keyString)

processes = [ ]
client = boto3.client('s3', use_ssl=False)
paginator = client.get_paginator('list_objects_v2')
result = paginator.paginate(Bucket='data-spliter',Delimiter='/',Prefix=prefix)
for page in result:
    if "Contents" in page:
        for key in page[ "Contents" ]:
            keyString = key[ "Key" ]
            print(f"Original file: {keyString}")
            ## searching for only .gz files in path and passing them for uncompression
            x = re.search(search_string, keyString)
            if x:
                ## matched
                print(f"filtered file: {keyString}")
                uncomp_keyString = keyString.replace(".gz", ".txt")
                print("STARTED Uncompressing file- " + str(keyString))
                t = Process(target=hello, args=(keyString,uncomp_keyString))
                print("process_append")
                processes.append(t)
                print("process_start")
                t.start()

for one_process in processes:
    one_process.join()

mylist = [ ]
while not q.empty():
    mylist.append(q.get())

print("Done!")
print(len(mylist))
print(mylist)

В качестве отправной точки у меня есть два файла .gz в расположении S3 .

Иногда при выполнении первого файла происходит сбой и ошибка, как показано ниже, и успешное сжатие второго файла.

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "parallel_decomp.py", line 52, in hello
    client.upload_fileobj(Fileobj=Fileobj,Bucket=bucket_name,Key=uncomp_keyString)
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/boto3/s3/inject.py", line 539, in upload_fileobj
    return future.result()
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/futures.py", line 106, in result
    return self._coordinator.result()
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/futures.py", line 265, in result
    raise self._exception
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/tasks.py", line 255, in _main
    self._submit(transfer_future=transfer_future, **kwargs)
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/upload.py", line 549, in _submit
    upload_input_manager.provide_transfer_size(transfer_future)
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/upload.py", line 324, in provide_transfer_size
    fileobj.seek(0, 2)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 379, in seek
    return self._buffer.seek(offset, whence)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_compression.py", line 129, in seek
    while self.read(io.DEFAULT_BUFFER_SIZE):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 474, in read
    if not self._read_gzip_header():
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 422, in _read_gzip_header
    raise OSError('Not a gzipped file (%r)' % magic)
OSError: Not a gzipped file (b'\x05i')

В других случаях выполнение второго файла не выполняется и возникает ошибка, как показано ниже, и сначала выполняется сжатие. файл успешно.

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "parallel_decomp.py", line 51, in hello
    client.upload_fileobj(Fileobj=Fileobj,Bucket=bucket_name,Key=uncomp_keyString)
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/boto3/s3/inject.py", line 539, in upload_fileobj
    return future.result()
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/futures.py", line 106, in result
    return self._coordinator.result()
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/futures.py", line 265, in result
    raise self._exception
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/tasks.py", line 255, in _main
    self._submit(transfer_future=transfer_future, **kwargs)
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/upload.py", line 549, in _submit
    upload_input_manager.provide_transfer_size(transfer_future)
  File "/Users/jatinder.luthra/Desktop/python/venv-p37/lib/python3.7/site-packages/s3transfer/upload.py", line 324, in provide_transfer_size
    fileobj.seek(0, 2)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 379, in seek
    return self._buffer.seek(offset, whence)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/_compression.py", line 129, in seek
    while self.read(io.DEFAULT_BUFFER_SIZE):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/gzip.py", line 482, in read
    uncompress = self._decompressor.decompress(buf, size)
zlib.error: Error -3 while decompressing data: invalid distance too far back

Нет ни одного выполнения после успешного завершения. Я всегда получаю одну из двух ошибок.

Любая помощь приветствуется. Заранее большое спасибо.

...