Чтение и анализ файла CSV в S3 без загрузки всего файла с использованием Python - PullRequest
0 голосов
/ 15 февраля 2019

Итак, я хочу прочитать большой CSV-файл из корзины S3, но я не хочу, чтобы этот файл был полностью загружен в память, я хочу каким-то образом передать его потоком в виде фрагментов и затем обработать его.

Пока что это то, что я сделал, но я не думаю, что это решит проблему.

import logging
import boto3
import codecs
import os
import csv

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

s3 = boto3.client('s3')


def lambda_handler(event, context):
    # retrieve bucket name and file_key from the S3 event
    bucket_name = event['Records'][0]['s3']['bucket']['name']
    file_key = event['Records'][0]['s3']['object']['key']
    chunk, chunksize = [], 1000
    if file_key.endswith('.csv'):
        LOGGER.info('Reading {} from {}'.format(file_key, bucket_name))

        # get the object
        obj = s3.get_object(Bucket=bucket_name, Key=file_key)
        file_object = obj['Body']
        count = 0
        for i, line in enumerate(file_object):
            count += 1
            if (i % chunksize == 0 and i > 0):
                process_chunk(chunk)
                del chunk[:]
            chunk.append(line)





def process_chunk(chuck):
    print(len(chuck))

Ответы [ 2 ]

0 голосов
/ 17 июля 2019

Вы пробовали AWS Athena https://aws.amazon.com/athena/?его очень хорошо без сервера и платить как ходят.Без загрузки файла он делает все, что вы хотите.BlazingSql является открытым исходным кодом и также полезен в случае проблем с большими данными.

0 голосов
/ 06 марта 2019

Это будет делать то, что вы хотите достичь.Он не будет загружать весь файл в память, вместо этого будет загружаться порциями, обрабатывать и продолжать:

  from smart_open import smart_open
  import csv

  def get_s3_file_stream(s3_path):
      """
      This function will return a stream of the s3 file.
      The s3_path should be of the format: '<bucket_name>/<file_path_inside_the_bucket>'
      """
      #This is the full path with credentials:
      complete_s3_path = 's3://' + aws_access_key_id + ':' + aws_secret_access_key + '@' + s3_path
      return smart_open(complete_s3_path, encoding='utf8')

  def download_and_process_csv:
      datareader = csv.DictReader(get_s3_file_stream(s3_path))
      for row in datareader:
          yield process_csv(row) # write a function to do whatever you want to do with the CSV
...