В настоящее время я использую AWS Lambda для получения наборов данных CSV из таблиц S3 в RDS Mysql. Я протестировал лямбда-функцию, которую я создал, которая отлично выполнялась, но по некоторым причинам наборы данных не были вставлены в таблицы базы данных в RDS.
Это может быть вызвано триггерами или другой частью, кроме Lambda, но мне трудно найти, в чем здесь проблема, так как я очень плохо знаком с AWS Lambda.
Будем очень благодарны за любые предложения или рекомендации.
Вот функция лямбда:
import sys
import logging
import rds_config
import pymysql
import numpy as np
import csv
import pandas as pd
import mysql.connector
import sqlalchemy
from sqlalchemy import create_engine
import boto3
import botocore
Bucket_Name = 'ceo-project'
s3_client = boto3.client('s3')
name = rds_config.db_username
password = rds_config.db_password
db_name = rds_config.db_name
rds_host = rds_config.rds_host
logger = logging.getLogger()
logger.setLevel(logging.INFO)
try:
conn = pymysql.connect(host=rds_host, user=name, passwd=password, port =3306, db=db_name, connect_timeout=30)
except pymysql.MySQLError as e:
logger.error("ERROR: Unexpected error: Could not connect to MySQL instance.")
logger.error(e)
sys.exit()
logger.info("SUCCESS: Connection to RDS MySQL instance succeeded")
engine = create_engine('mysql+pymysql://'+name+':'+password+'@'+rds_host+':3306'+'/'+db_name)
def handler(event, context):
def S3_get_csv(Bucket_Name, Folder_Name,i):
if isinstance(prefix, str):
kwargs['Prefix'] = prefix
while True:
resp = s3_client.list_objects_v2(Bucket=Bucket_Name, Prefix=Folder_Name+'/')
latest = max(contents, key=lambda x: x['LastModified'])
try:
contents = resp['Contents']
except KeyError:
return
for obj in contents:
key = obj['Key']
if key.startswith(prefix):
yield obj
try:
s3.Bucket(Bucket_Name).download_file(latest['Key'],latest['Key'].split(sep='/')[1])
df = pd.read_csv(latest['Key'].split(sep='/')[1])
df.to_sql(name=table_list[i], con=engine, if_exists = 'append', index=False,chunksize=10000,method='multi')
except botocore.exceptions.ClientError as e:
if e.resp['Error']['Code'] == "404":
print("The object does not exist.")
else:
raise
def get_matching_s3_keys(bucket, prefix='', suffix=''):
for obj in get_matching_s3_objects(bucket, prefix):
yield obj['Key']
Folder_list = ['1','2','3','4','5','6','7']
table_list = ['a','b','c','d','e','f','g']
for i in range(0,7):
S3_get_csv(Bucket_Name,Folder_list[i],i)
return "CSV files inserted to RDS"