Как прочитать файл CSV из s3 и записать содержимое в таблицу базы данных RDS, используя python лямбда-функцию? - PullRequest
0 голосов
/ 28 января 2020

У меня есть CSV-файл Employee.csv в корзине S3 со всей информацией о сотруднике: имя, возраст, зарплата, назначение. Я должен написать python лямбда-функцию, чтобы прочитать этот файл и записать в RDS db, например, он должен создать таблицу как Employee, с данными столбцов, возрастом, зарплатой, назначением и строками.

Employee.csv - это, например, например, фактически это может быть любой CSV-файл с любым количеством столбцов в нем.

Ответы [ 2 ]

2 голосов
/ 28 января 2020
from __future__ import print_function
import boto3
import logging
import os
import sys
import uuid
import pymysql
import csv
import rds_config


rds_host  = rds_config.rds_host
name = rds_config.db_username
password = rds_config.db_password
db_name = rds_config.db_name


logger = logging.getLogger()
logger.setLevel(logging.INFO)

try:
    conn = pymysql.connect(rds_host, user=name, passwd=password, db=db_name, connect_timeout=5)
except Exception as e:
    logger.error("ERROR: Unexpected error: Could not connect to MySql instance.")
    logger.error(e)
    sys.exit()

logger.info("SUCCESS: Connection to RDS mysql instance succeeded")

s3_client = boto3.client('s3')

def handler(event, context):

    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key'] 
    download_path = '/tmp/{}{}'.format(uuid.uuid4(), key)

    s3_client.download_file(bucket, key,download_path)

    csv_data = csv.reader(file( download_path))

    with conn.cursor() as cur:
        for idx, row in enumerate(csv_data):

            logger.info(row)
            try:
                cur.execute('INSERT INTO target_table(name, age, salary, designation)' \
                                'VALUES("%s", "%s", "%s", "%s")'
                                , row)
            except Exception as e:
                logger.error(e)

            if idx % 100 == 0:
                conn.commit()

        conn.commit()

    return 'File loaded into RDS:' + str(download_path)
0 голосов
/ 11 февраля 2020

Вот код, который работает для меня сейчас:

s3 = boto3.resource('s3')
    file_object=event['Records'][0]
    key=str(file_object['s3']['object']['key'])
    obj = s3.Object(bucket, key)

    content_lines=obj.get()['Body'].read().decode('utf-8').splitlines(True)

    tableName= key.strip('folder/').strip('.csv')

    with conn.cursor() as cur:
        try:
            cur.execute('TRUNCATE TABLE '+tableName)
        except Exception as e:
            print("ERROR: Unexpected error:Table does not exit.")
            sys.exit()        
        header=True
        for row in csv.reader(content_lines):
            if(header):
                numberOfColumns=len(row)
                columnNames= str(row).replace('[','').replace(']','').replace("'",'')
                print("columnNames:"+columnNames)
                values='%s'
                numberOfValues=len(values)
                numberOfValues=1
                while numberOfValues< numberOfColumns:    
                    values=values+",%s"
                    numberOfValues+=1
                print("INSERT into "+tableName+"("+columnNames+") VALUES("+values+")")
                header=False
            else:
                try:
                    cur.execute('INSERT into '+tableName+'('+columnNames+') VALUES('+values+')', row)
                except Exception as e:
                    raise e
        conn.commit()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...