Загрузка данных из Афины в информационный фрейм Pandas в Cloud9 / Lambda в AWS для ETL - PullRequest
1 голос
/ 08 апреля 2019

Я строю датак в AWS.Исходные данные импортируются в S3 как CDC.Мне нужно найти способ объединить их, чтобы иметь таблицу с самой последней версией информации.

Изначально я хотел использовать Glue для разработки ETL, но редактор кажется довольно неуклюжим.Кроме того, объем данных не так велик, что искра необходима.Панды тоже будут работать и имеют гораздо более широкую базу знаний в организации.

Итак, я использовал Glue для сканирования импорта, и теперь у меня есть таблицы Athena, для которых я хочу разработать свои агрегаты в Cloud9 для последующей миграции на функцию Lambda.

Проблема в том, что я не могу перенести данные Athena в фрейм данных.

Я попробовал функцию start_query_execution из boto3, но она не возвращает данные, а только записывает их в S3, что мне не нужно.Он также возвращается как QueryExecutionId, который я передал в другую функцию boto с именем get_query_results.Кажется, что есть ответ, но я борюсь за то, как передать данные в фрейм данных (это JSON или dict?).

#python 3.6
import pandas as pd
import numpy as np
import boto3
import time

#https://dev.classmethod.jp/cloud/run-amazon-athenas-query-with-aws-lambda/

#athena constant
DATABASE = 'myDatabase'
TABLE = 'myTable'

#output
S3_OUTPUT = 's3://myBucket/myPath/'

client = boto3.client('athena')

response = client.start_query_execution(
        QueryString='select * from myTable limit 100',
        QueryExecutionContext={
            'Database': DATABASE
        },
        ResultConfiguration={
            'OutputLocation': S3_OUTPUT,

        }
)

print(response["QueryExecutionId"])

time.sleep(50)

data = client.get_query_results(
    QueryExecutionId=response["QueryExecutionId"]
)

dataDf = pd.read_json(data["ResultSet"])
print(dataDf.head())

1 Ответ

0 голосов
/ 13 июня 2019

Это сработало для меня.Загрузите файл вместо использования ответа JSON.

import os
import boto3

s3 = boto3.client('s3')
bucket = 'myBucket'
key = 'myPath'

data_file_name = f'{response["QueryExecutionId"]}.csv'
object = os.path.join(key, data_file_name)
s3.download_file(bucket, object, data_file_name)
df = pd.read_csv(data_file_name)
...