Лямбда-фн, чтобы получить файл s3, используйте его, чтобы изменить другой файл s3, затем перепишите его на s3 - PullRequest
0 голосов
/ 12 октября 2018

Это код Python, который я использовал для манипулирования файлом table1 с помощью ссылочного файла pds_ref. Таким образом, pds_ref выглядит так:

|THE_TABLE|THE_KEY
|table1|3
|table1|1

table1 выглядит так

|ID|NAME
|1|Imran
|2|Peter
|3|Pedro
|4|Carlos

Идея состоит в том, чтобы использовать ссылки в pds_ref, чтобы удалить записи из любой таблицы, которая указана в списке, и соответствующий ей ключ ... в этом случае 1 и 3 должны быть удалены

Этот код Python работает так же, как импорт Python CSV

with open("pds_ref","rb") as ref_file:
    refreader=csv.DictReader(ref_file, delimiter='|')
    reftable=[row for row in refreader]
    refheader = refreader.fieldnames    
    for refrow in reftable:
        print refrow['THE_TABLE']   
        print refrow['THE_KEY']
        with open(refrow['THE_TABLE'], "rbw") as infile:
                reader = csv.DictReader(infile, delimiter='|')
                table = [row for row in reader]
                header = reader.fieldnames 
        with open(refrow['THE_TABLE'], "wb") as outfile:
                writer = csv.DictWriter(outfile, header,delimiter='|')
                writer.writeheader()
                for row in table:
                    if row['ID'] != refrow['THE_KEY'] :
                        writer.writerow(row)

Теперь я хочу сделать это с помощью лямбды, чтобы эта функция запускалась каждый раз, когда кто-то загружает файл pds_ref

Я дошел до возможности получить файл pds_ref и прочитать каждыйстрока, но возникают проблемы с аналогичным открытием и записью измененного файла table1.Любая помощь приветствуется.

import boto3
import csv
import io



def lambda_handler(event, context):
    s3 = boto3.client("s3")

    if event:
        print ("Event : ", event)
        file_obj = event["Records"][0]
        filename = str(file_obj['s3']['object']['key'])
        bucketname = str(file_obj['s3']['bucket']['name'])
        print("Filename: ",filename)
        print("Bucket: ",bucketname)
        fileObj = s3.get_object(Bucket= "lambda-trig1",Key=filename)
        print ("fileObj: ",fileObj)
        file_content = fileObj["Body"].read().decode('utf-8')
        print(file_content)

        f_pds_ref = s3.get_object(Bucket= "lambda-trig1",Key='pds_ref')
        fc_pds_ref = f_pds_ref['Body'].read().decode('utf-8').splitlines(True) 

        for refrow in csv.DictReader(fc_pds_ref,delimiter='|'):
            print refrow['THE_TABLE']
            print refrow['THE_KEY']
            current_table = refrow['THE_TABLE']
            current_key = refrow['THE_KEY']
            f_the_next_table = s3.get_object(Bucket= "lambda-trig1",Key=current_table)
            fc_the_next_table = f_the_next_table['Body'].read().decode('utf-8').splitlines(True) 
            with open(refrow[f_the_next_table], "rbw") as infile:
                reader = csv.DictReader(infile, delimiter='|')
            #   table = [row for row in reader]
            #   header = reader.fieldnames 
            #   print (header)

1 Ответ

0 голосов
/ 13 октября 2018

Перед запуском процесса обновления другой таблицы,
вы хотите убедиться, что он работает только для Put событий.

Вот несколько дополнений к вашим текущим шагам после прочтения pds_ref:

  • Группировать все THE_KEY s по THE_TABLE.

    Это позволяет выполнять уникальные итерации для обновления объектов таблицы вместо нескольких итераций для содержимого в одном и том же объекте таблицы..

  • Для каждой группы THE_TABLE,
    читать объект таблицы и отфильтровывать строки в группе THE_KEY,
    записывать отфильтрованное содержимое в объект таблицы.

Это может быть реализовано следующим образом

from contextlib import contextmanager
from csv import DictReader, DictWriter
from collections import defaultdict
import io

import boto3

s3 = boto3.client("s3")

BUCKET = "creeper-bank"
DELIMITER = "|"
TABLE_OBJECT_COLUMNS = ['', 'ID', 'NAME']
WATCH_KEY = "pds_ref"


def content_as_dict_reader(content):
    yield DictReader(
        content.splitlines(),
        delimiter=DELIMITER)

@contextmanager
def tables_and_lines_for_deletion():
    object_ = s3.get_object(
        Bucket=BUCKET, Key=WATCH_KEY
    )
    content = object_["Body"].read().decode('utf-8')
    return content_as_dict_reader(content)

@contextmanager
def table_record(table):
    object_ = s3.get_object(
        Bucket=BUCKET, Key=table
    )
    content = object_["Body"].read().decode('utf-8')
    return content_as_dict_reader(content)

def object_table(table, record):
    with io.StringIO() as file_:
        writer = DictWriter(
            file_,
            fieldnames=TABLE_OBJECT_COLUMNS,
            delimiter=DELIMITER
        )
        writer.writeheader()
        writer.writerows(list(record))

        s3.put_object(
            Bucket=BUCKET,
            Key=table,
            Body=file_.getvalue()
        )

def lambda_handler(event, context):
    if not event:
        print("Function must be triggered via a published event")
        return

    event_record, *_ = event["Records"]
    match_watchkey = True
    try:
        event_name = str(event_record['eventName'])
        if "Put" not in event_name:
            match_watchkey = False

        s3_event = event_record['s3']
        print("checking if S3 event is a put one for :WATCH_KEY")

        key = s3_event['object']['key']
        bucket = s3_event['bucket']['name']

        if key != WATCH_KEY:
            match_watchkey = False
        if bucket != BUCKET:
            match_watchkey = False
    except KeyError:
        # Handle when event_record isn't an S3 one.
        match_watchkey = False
    if not match_watchkey:
        print("Published event did not match :WATCH_KEY.")
        return

    print("S3 event is a put one for :WATCH_KEY!")

    table_group = defaultdict(list)

    print("Reading :WATCH_KEY content")
    with tables_and_lines_for_deletion() as tables:
        for dct in tables:
            table_k = dct['THE_TABLE']
            table_v = dct['THE_KEY']
            table_group[table_k].append(table_v)

    print("Updating objects found in :WATCH_KEY content")
    for t, ids in table_group.items():
        record_update = None
        with table_record(t) as record:
            record_update = (
                dct
                for dct in record
                if dct["ID"] not in ids
            )
        object_table(t, record_update)
    print("Update completed!")
    return

Тестирование с примером события

sample_event = {
    'Records': [
        {
            'eventName':  'ObjectCreated:Put',

            's3': {
                'bucket': {
                    'name': 'creeper-bank',
                },
                'object': {
                    'key': 'pds_ref',
                }
            },
        }
    ]
}
lambda_handler(sample_event, {})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...