Я пытаюсь прочитать большой CSV из S3, который мне нужно переформатировать в файлы (по одному на строку). Я хотел бы сохранить его как единый упакованный архив (например, tar) и не создавать отдельные файлы (будет слишком много маленьких файлов). Архив также должен быть зашифрован (симметрично c, только файлы, защищенные парольной фразой).
По сути, как псевдоканал: S3 CSV.gz | GUNZIP | ТАР | ШИФРОВАТЬ | S3 TAR.EN C
Теоретически это должно быть возможно: все шаги являются потоковыми, но мне трудно заставить его работать как свободный поток.
Первый рабочий пример это просто tar, без шифрования; это функционально; Я вижу, как он загружает файл результатов по частям, а на панели управления EC2 отображается хороший постоянный вход / выход. Я использую smart_open, поскольку он генерирует файловый объект для записи с использованием .open (), boto, насколько мне известно, не работает (вы должны передать bytesio, и вам нужно перемотать bytesio перед загрузкой). Я использую соединение boto для чтения, поскольку smart_open иногда выдает ошибки (пустые / ошибочные строки):
import boto3
import botocore
import tarfile
from io import BytesIO
from pretty_bad_protocol import gnupg
import smart_open
import pyAesCrypt
src_bucket='srcbucket'
src_prefix='path/to/csvfile.csv'
dst_bucket='dstbucket'
dst_prefix='path/to/output.tar'
passphrase='encryptionpassword'
s3_client = boto3.client('s3')
with smart_open.open('s3://'+dst_bucket+'/'+dst_prefix,'wb') as fout:
with tarfile.open(mode = "w:gz", fileobj = fout) as tar:
response=s3_client.get_object(Bucket=src_bucket, Key=src_prefix)
gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
reader = csv.reader(gzipped)
for i, line in enumerate(reader, start=1):
str_write=line[1].encode('utf8')
file = BytesIO(str_write)
info = tarfile.TarInfo(name=line[0]+".txt")
info.size = len(str_write)
tar.addfile(tarinfo=info, fileobj=file)
Чтобы записать зашифрованный результат, afaik нет подобных методов .open () для доступного шифрования, поэтому мы должны сначала передать bytesIO и перемотать его:
bufferSize = 64 * 1024
with smart_open.open('s3://'+dst_bucket+'/'+dst_prefix,'wb') as fout:
tar_out_bytes = BytesIO()
with tarfile.open(mode = "w:gz", fileobj = tar_out_bytes) as tar:
response=s3_client.get_object(Bucket=src_bucket, Key=src_prefix)
gzipped = GzipFile(None, 'rb', fileobj=response['Body'])
reader = csv.reader(gzipped)
for i, line in enumerate(reader, start=1):
with smart_open.open('s3://'+src_bucket+'/'+dst_prefix, 'r') as fin:
tar_out_bytes = BytesIO()
with tarfile.open(mode = "w:gz", fileobj = tar_out_bytes) as tar:
reader = csv.reader(fin)
for i, line in enumerate(reader, start=1):
str_write=line[1].encode('utf8')
file = BytesIO(str_write)
info = tarfile.TarInfo(name=line[0]+".txt")
info.size = len(str_write)
tar.addfile(tarinfo=info, fileobj=file)
tar_out_bytes.seek(0)
pyAesCrypt.encryptStream(tar_out_bytes, fout, passphrase, bufferSize)
, к сожалению, это создает файл, который я не могу расшифровать, но он также шифруется только после завершения tar, т.е. не шифрует, пока создается rar.
Использовать GPG еще сложнее, но он создает файл, с которым мы можем работать. Вы не можете напрямую передать bytesIO в GPG для записи, но вы можете получить bytesio из результата gpg.encrypt; это означает, что мы должны передать этот bytesio обратно и направить его на S3 (используя вместо этого boto):
gpg = gnupg.GPG()
with smart_open.open('s3://'+src_bucket+'/'+dst_prefix, 'r') as fin:
tar_out_bytes = BytesIO()
with tarfile.open(mode = "w:gz", fileobj = tar_out_bytes) as tar:
reader = csv.reader(fin)
for i, line in enumerate(reader, start=1):
str_write=line[1].encode('utf8')
file = BytesIO(str_write)
info = tarfile.TarInfo(name=line[0]+".txt")
info.size = len(str_write)
tar.addfile(tarinfo=info, fileobj=file)
tar.close()
tar_out_bytes.seek(0)
encrypted_data=gpg.encrypt(tar_out_bytes, symmetric='AES256',passphrase=passphrase,armor=False,encrypt=False)
gpg_out_bytes = BytesIO(encrypted_data.data)
gpg_out_bytes.seek(0)
# use boto to write the gpg_out_bytes to s3
response = s3_client.upload_fileobj(gpg_out_bytes,dst_bucket,dst_prefix
Это работает, но: он также сначала создаст tar, затем зашифрует tar, и только потом начинайте закачку. Потоковая передача работает не так, и это неоптимальное использование ресурсов.
Возможно ли вообще достичь такого рода плавной потоковой передачи с созданием архива tar и шифрованием на go?
Я бы псевдо хотел бы сделать следующее: with encrypter.open(fileobj=<filelike>,passphrase="") as f_encrypt:
так что мы можем передать f_encrypt как fileobj в tar.open. Можно ли это сделать? Я просмотрел cryptfile , но, к сожалению, он не работает с записью в smart_open, поскольку ему нужен произвольный доступ (file.seek ()), поэтому он не подходит для потоковой передачи обратно на S3. Что еще более важно, он создает зашифрованные файлы, которые могут быть расшифрованы только с помощью cryptfile, а не, например, openssl или другими распространенными инструментами.
Я не привязан к GPG или AES, моя главная забота о шифровании заключается в том, инструменты, доступные третьим сторонам для расшифровки файлов с использованием выбранной платформы (т. е. общедоступные инструменты, например gpg, удовлетворяют этому).