Чтение нескольких файлов из корзины S3 и их обработка с использованием лямбда-триггера - PullRequest
0 голосов
/ 04 июля 2019

Я читаю несколько файлов в S3, обрабатываю их и затем создаю таблицы в AWS RDS с этими обработанными кадрами данных.Я делаю все это на моей Mac OS, используя PyCharm.

Я хочу прочитать эти CSV-файлы из корзины S3 и запустить этот же скрипт Python для обработки этих файлов в AWS, а не в моей локальной системе.Я хочу использовать лямбду для запуска этого скрипта, и он должен работать только тогда, когда все необходимые файлы загружены в корзину.

Как будет изменяться код в AWS Lambda?

Мой нынешний кодкак показано ниже -

import boto3
import pandas as pd
import numpy as np
import sys

client = boto3.client('s3')
resource = boto3.resource('s3')
my_bucket = resource.Bucket('test-s3')



#CREATE ALL THE NEEDED OBJECTS
obj1 = client.get_object(Bucket='test-s3', Key='file1.csv')
obj2 = client.get_object(Bucket='test-s3', Key='file2.csv')
obj3 = client.get_object(Bucket='test-s3', Key='file3.csv')
obj4 = client.get_object(Bucket='test-s3', Key='file4.csv')
obj5 = client.get_object(Bucket='test-s3', Key='file5.csv')
obj6 = client.get_object(Bucket='test-s3', Key='file6.csv')
obj7 = client.get_object(Bucket='test-s3', Key='file7.csv')
obj8 = client.get_object(Bucket='test-s3', Key='file8.csv')
obj9 = client.get_object(Bucket='test-s3', Key='file9.csv')
obj10 = client.get_object(Bucket='test-s3', Key='file10.csv')
obj11 = client.get_object(Bucket='test-s3', Key='file11.csv')
obj12 = client.get_object(Bucket='test-s3', Key='file12.csv')
obj13 = client.get_object(Bucket='test-s3', Key='file13.csv')
obj14 = client.get_object(Bucket='test-s3', Key='file14.csv')
obj15 = client.get_object(Bucket='test-s3', Key='file15.csv')


#CREATE ALL THE DATAFRAMES FROM RESPECTIVE OBJECTS
df_file1 = pd.read_csv(obj1['Body'], encoding='utf-8', sep = ',')
df_file2 = pd.read_csv(obj2['Body'], encoding='utf-8', sep = ',')
df_file3 = pd.read_csv(obj3['Body'], encoding='utf-8', sep = ',')
df_file4 = pd.read_csv(obj4['Body'], encoding='utf-8', sep = ',')
df_file5 = pd.read_csv(obj5['Body'], encoding='utf-8', sep = ',')
df_file6 = pd.read_csv(obj6['Body'], encoding='utf-8', sep = ',')
df_file7 = pd.read_csv(obj7['Body'], encoding='utf-8', sep = ',')
df_file8 = pd.read_csv(obj8['Body'], encoding='utf-8', sep = ',')
df_file9 = pd.read_csv(obj9['Body'], encoding='utf-8', sep = ',')
df_file10 = pd.read_csv(obj10['Body'], encoding='utf-8', sep = ',')
df_file11 = pd.read_csv(obj11['Body'], encoding='utf-8', sep = ',')
df_file12 = pd.read_csv(obj12['Body'], encoding='utf-8', sep = ',')
df_file13 = pd.read_csv(obj13['Body'], encoding='utf-8', sep = ',')
df_file14 = pd.read_csv(obj14['Body'], encoding='utf-8', sep = ',')
df_file15 = pd.read_csv(obj15['Body'], encoding='utf-8', sep = ',')


#+++++++++++ make a function to process the data frames ++++++++++++


def function(df_file1, df_file2):
     *** some logic ***

        return df_final



## MAKE THE TABLES IN RDS

from sqlalchemy import create_engine
import psycopg2
engine = create_engine('postgresql://USERNAME:PASSWORD@***.eu-central-1.rds.amazonaws.com:5432/DBNAME')
df_final.to_sql('table name', engine, schema='data')

Я новичок в AWS Lambda.Как мне запустить этот скрипт на Lambda?

После того, как я принял предложение Ninad, я отредактировал скрипт.Это как показано ниже -

import boto3
import pandas as pd
import numpy as np
import sys

client = boto3.client('s3')
resource = boto3.resource('s3')
my_bucket = resource.Bucket('test-s3')

def function(df_file1, df_file2):
     *** some logic ***

        return df_final



def lambda_handler(event, context):
    obj1 = client.get_object(Bucket='test-s3', Key='file1.csv')
    obj2 = client.get_object(Bucket='test-s3', Key='file2.csv')
    obj3 = client.get_object(Bucket='test-s3', Key='file3.csv')


    df_file1 = pd.read_csv(obj1['Body'], encoding='utf-8', sep=',')
    df_file2 = pd.read_csv(obj2['Body'], encoding='utf-8', sep=',')
    df_file3 = pd.read_csv(obj3['Body'], encoding='utf-8', sep=',')


    df_final = function(df_file1, df_file2)

    from sqlalchemy import create_engine
    import psycopg2
    engine = create_engine('postgresql://USERNAME:PASSWORD@***.eu-central-1.rds.amazonaws.com:5432/DBNAME')
    df_final.to_sql('table name', engine, schema='data')

Я создал виртуальную среду в своей локальной системе и установил все пакеты - pandas, SQLAlchemy и т. Д. Я заархивировал этот пакет и скрипт и загрузил его в Lambda.Теперь я получаю эту ошибку -

[ERROR] Runtime.ImportModuleError: Unable to import module 'lambda_function': No module named 'pandas'

Я следовал по ссылке развертывания пакета aws , чтобы упаковать все необходимые вещи.Почему я все еще получаю сообщение об ошибке?

1 Ответ

0 голосов
/ 04 июля 2019

Используйте консоль для создания лямбды.Выберите правильную версию Python, которую вы хотите, а также убедитесь, что вы выделили достаточно памяти и установите время ожидания до 15 минут (максимум).При создании лямбды это также позволит вам добавить к ней роль.Создайте роль и прикрепите к ней политику, которая позволит вам получить доступ к корзине s3, где будут находиться ваши CSV.

Следующим шагом является создание слоя для вашей лямбды, который будет иметь все зависимости, необходимые для запуска вашего скрипта.По умолчанию в Lambda установлен пакет boto3, но вам нужно будет установить pandas (со всеми зависимостями), sqlalchemy и psycopg2.Вы можете найти простое руководство о том, как это сделать здесь

Теперь, когда вы создали слой, присоедините этот слой к своей лямбде.

Наконец-то мы можем перейти к вашему сценарию.Так как вам нужно прочитать все CSV-файлы на вашем пути S3, вам придется изменить свой сценарий для динамического чтения CSV-файлов.В настоящее время вы жестко закодировали имена файлов CSV.Вы можете изменить свой скрипт, чтобы сначала получить все ключи в вашем ведре, используя что-то вроде:

response = client.list_objects_v2(
    Bucket=my_bucket
)['Contents']

Это даст вам список ваших ключей.Отфильтруйте их, если вам нужно.

Далее вы можете создать несколько фреймов данных, просматривая ответ, подобный следующему:

d = {}
for idx, obj in enumerate(response):
    d['df_'+idx] = pd.read_csv(client.get_object(Bucket='test-s3', Key=obj['Key'])['Body'], encoding='utf-8', sep = ',')

Это создаст словарь d со всеми вашими фреймами данных.Пожалуйста, сначала попробуйте этот код локально, чтобы исправить любые ошибки.

Теперь скопируйте ваш окончательный код и вставьте его в лямбда-редактор над def lambda handler(): Вызовите свою функцию из функции-обработчика лямбда-выражения.

...