Потоковая передача / чанки csv из S3 в Python - PullRequest
0 голосов
/ 28 июня 2018

Я намерен выполнить некоторые операции с большим объемом памяти над очень большим CSV-файлом, хранящимся в S3, с использованием Python, чтобы переместить скрипт в AWS Lambda. Я знаю, что могу читать во всей памяти csv nto, но я определенно столкнусь с ограничениями памяти и памяти Lambda с таким большим файлом, есть ли какой-нибудь способ для потоковой передачи или просто чтения кусков csv за раз в Python, используя boto3 / botocore, в идеале, указав номера строк для чтения?

Вот некоторые вещи, которые я уже пробовал:

1) использование параметра range в S3.get_object для указания диапазона байтов для чтения. К сожалению, это означает, что последние строки обрезаются посередине, поскольку нет способа указать количество строк для чтения в Существуют некоторые грязные обходные пути, такие как сканирование последнего символа новой строки, запись индекса, а затем использование его в качестве начальной точки для следующего диапазона байтов, но я бы хотел, если возможно, избежать этого неуклюжего решения.

2) Использование S3 select для написания sql запросов для выборочного извлечения данных из сегментов S3. К сожалению, функция row_numbers SQL не поддерживается, и не похоже, что есть способ чтения в подмножестве строк.

1 Ответ

0 голосов
/ 02 июля 2018

Если ваш файл не сжат, это должно включать чтение из потока и разбиение на символ новой строки. Чтение фрагмента данных, поиск последнего экземпляра символа новой строки в этом фрагменте, разбиение и обработка.

s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']

# number of bytes to read per chunk
chunk_size = 1000000

# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()   
partial_chunk = b''

while (True):
    chunk = partial_chunk + body.read(chunk_size)

    # If nothing was read there is nothing to process
    if chunk == b'':
        break

    last_newline = chunk.rfind(newline)

    # write to a smaller file, or work against some piece of data
    result = chunk[0:last_newline+1].decode('utf-8')

    # keep the partial line you've read here
    partial_chunk = chunk[last_newline+1:]

Если у вас есть сжатые файлы, то вам нужно использовать BytesIO и класс GzipFile внутри цикла; это сложнее, потому что вам нужно сохранить детали сжатия Gzip.

...