Потоковая передача CSV из S3, строки tar в виде файлов, шифрование, потоковая передача обратно в S3 с использованием python - PullRequest
0 голосов
/ 18 июня 2020

Я пытаюсь прочитать большой CSV из S3, который мне нужно переформатировать в файлы (по одному на строку). Я хотел бы сохранить его как единый упакованный архив (например, tar) и не создавать отдельные файлы (будет слишком много маленьких файлов). Архив также должен быть зашифрован (симметрично c, только файлы, защищенные парольной фразой).

По сути, как псевдоканал: S3 CSV.gz | GUNZIP | ТАР | ШИФРОВАТЬ | S3 TAR.EN C

Теоретически это должно быть возможно: все шаги являются потоковыми, но мне трудно заставить его работать как свободный поток.

Первый рабочий пример это просто tar, без шифрования; это функционально; Я вижу, как он загружает файл результатов по частям, а на панели управления EC2 отображается хороший постоянный вход / выход. Я использую smart_open, поскольку он генерирует файловый объект для записи с использованием .open (), boto, насколько мне известно, не работает (вы должны передать bytesio, и вам нужно перемотать bytesio перед загрузкой). Я использую соединение boto для чтения, поскольку smart_open иногда выдает ошибки (пустые / ошибочные строки):

import boto3
import botocore
import tarfile
from io import BytesIO
from pretty_bad_protocol import gnupg
import smart_open
import pyAesCrypt

src_bucket='srcbucket'
src_prefix='path/to/csvfile.csv'
dst_bucket='dstbucket'
dst_prefix='path/to/output.tar'
passphrase='encryptionpassword'

s3_client = boto3.client('s3')

with smart_open.open('s3://'+dst_bucket+'/'+dst_prefix,'wb') as fout:
    with tarfile.open(mode = "w:gz", fileobj = fout) as tar:
        response=s3_client.get_object(Bucket=src_bucket, Key=src_prefix) 
        gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
        reader = csv.reader(gzipped)
        for i, line in enumerate(reader, start=1):
            str_write=line[1].encode('utf8')
            file = BytesIO(str_write)
            info = tarfile.TarInfo(name=line[0]+".txt")
            info.size = len(str_write)
            tar.addfile(tarinfo=info, fileobj=file)

Чтобы записать зашифрованный результат, afaik нет подобных методов .open () для доступного шифрования, поэтому мы должны сначала передать bytesIO и перемотать его:

bufferSize = 64 * 1024
with smart_open.open('s3://'+dst_bucket+'/'+dst_prefix,'wb') as fout:
    tar_out_bytes = BytesIO()
    with tarfile.open(mode = "w:gz", fileobj = tar_out_bytes) as tar:
        response=s3_client.get_object(Bucket=src_bucket, Key=src_prefix) 
        gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
        reader = csv.reader(gzipped)
        for i, line in enumerate(reader, start=1):


    with smart_open.open('s3://'+src_bucket+'/'+dst_prefix, 'r') as fin:
        tar_out_bytes = BytesIO()
        with tarfile.open(mode = "w:gz", fileobj = tar_out_bytes) as tar:
            reader = csv.reader(fin)
            for i, line in enumerate(reader, start=1):
                str_write=line[1].encode('utf8')
                file = BytesIO(str_write)
                info = tarfile.TarInfo(name=line[0]+".txt")
                info.size = len(str_write)
                tar.addfile(tarinfo=info, fileobj=file)
            tar_out_bytes.seek(0)                  
            pyAesCrypt.encryptStream(tar_out_bytes, fout, passphrase, bufferSize)

, к сожалению, это создает файл, который я не могу расшифровать, но он также шифруется только после завершения tar, т.е. не шифрует, пока создается rar.

Использовать GPG еще сложнее, но он создает файл, с которым мы можем работать. Вы не можете напрямую передать bytesIO в GPG для записи, но вы можете получить bytesio из результата gpg.encrypt; это означает, что мы должны передать этот bytesio обратно и направить его на S3 (используя вместо этого boto):

gpg = gnupg.GPG()
with smart_open.open('s3://'+src_bucket+'/'+dst_prefix, 'r') as fin:
    tar_out_bytes = BytesIO()
    with tarfile.open(mode = "w:gz", fileobj = tar_out_bytes) as tar:
        reader = csv.reader(fin)
        for i, line in enumerate(reader, start=1):
            str_write=line[1].encode('utf8')
            file = BytesIO(str_write)
            info = tarfile.TarInfo(name=line[0]+".txt")
            info.size = len(str_write)
            tar.addfile(tarinfo=info, fileobj=file)
        tar.close()
        tar_out_bytes.seek(0)
        encrypted_data=gpg.encrypt(tar_out_bytes, symmetric='AES256',passphrase=passphrase,armor=False,encrypt=False)
        gpg_out_bytes = BytesIO(encrypted_data.data)
        gpg_out_bytes.seek(0)
        # use boto to write the gpg_out_bytes to s3
        response = s3_client.upload_fileobj(gpg_out_bytes,dst_bucket,dst_prefix

Это работает, но: он также сначала создаст tar, затем зашифрует tar, и только потом начинайте закачку. Потоковая передача работает не так, и это неоптимальное использование ресурсов.

Возможно ли вообще достичь такого рода плавной потоковой передачи с созданием архива tar и шифрованием на go?

Я бы псевдо хотел бы сделать следующее: with encrypter.open(fileobj=<filelike>,passphrase="") as f_encrypt: так что мы можем передать f_encrypt как fileobj в tar.open. Можно ли это сделать? Я просмотрел cryptfile , но, к сожалению, он не работает с записью в smart_open, поскольку ему нужен произвольный доступ (file.seek ()), поэтому он не подходит для потоковой передачи обратно на S3. Что еще более важно, он создает зашифрованные файлы, которые могут быть расшифрованы только с помощью cryptfile, а не, например, openssl или другими распространенными инструментами.

Я не привязан к GPG или AES, моя главная забота о шифровании заключается в том, инструменты, доступные третьим сторонам для расшифровки файлов с использованием выбранной платформы (т. е. общедоступные инструменты, например gpg, удовлетворяют этому).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...