Невозможно прочитать запрос Athena в pandas фрейм данных - PullRequest
0 голосов
/ 05 апреля 2020

У меня есть приведенный ниже код, и я хочу, чтобы он правильно возвращал фрейм данных. Лог опроса c работает, но фрейм данных не создается и не возвращается. Прямо сейчас он просто возвращает None при вызове.

import boto3
import pandas as pd
import io
import re
import time

AK='mykey'
SAK='mysecret'

params = {
    'region': 'us-west-2',
    'database': 'default',
    'bucket': 'my-bucket',
    'path': 'dailyreport',
    'query': 'SELECT * FROM v_daily_report LIMIT 100'
}

session = boto3.Session(aws_access_key_id=AK,aws_secret_access_key=SAK)


# In[32]:


def athena_query(client, params):

    response = client.start_query_execution(
        QueryString=params["query"],
        QueryExecutionContext={
            'Database': params['database']
        },
        ResultConfiguration={
            'OutputLocation': 's3://' + params['bucket'] + '/' + params['path']
        }
    )
    return response


def athena_to_s3(session, params, max_execution = 5):
    client = session.client('athena', region_name=params["region"])
    execution = athena_query(client, params)
    execution_id = execution['QueryExecutionId']
    df = poll_status(execution_id, client)
    return df

def poll_status(_id, client):
    '''
    poll query status
    '''
    result = client.get_query_execution(
        QueryExecutionId = _id
    )

    state = result['QueryExecution']['Status']['State']
    if state == 'SUCCEEDED':
        print(state)
        print(str(result))
        s3_key = 's3://' + params['bucket'] + '/' + params['path']+'/'+ _id + '.csv'
        print(s3_key)
        df = pd.read_csv(s3_key)
        return df
    elif state == 'QUEUED':
        print(state)
        print(str(result))
        time.sleep(1)
        poll_status(_id, client)
    elif state == 'RUNNING':
        print(state)
        print(str(result))
        time.sleep(1)
        poll_status(_id, client)
    elif state == 'FAILED':
        return result
    else:
        print(state)
        raise Exception


df_data = athena_to_s3(session, params)

print(df_data)

Я планирую переместить нагрузку на информационный фрейм из функции опроса, но сейчас просто пытаюсь заставить ее работать так, как есть.

Ответы [ 2 ]

0 голосов
/ 06 апреля 2020

Я рекомендую вам взглянуть на AWS Wrangler вместо того, чтобы использовать традиционный boto3 Athena API. Этот новый и более специфичный c интерфейс для всех данных в AWS, включая запросы к Athena и предоставляющий больше функций.

import awswrangler as wr

df = wr.pandas.read_sql_athena(
    sql="select * from table",
    database="database"
)

Благодаря комментарию @RagePwn стоит проверить PyAthena в качестве альтернативы опции boto3 для запроса Athena.

0 голосов
/ 05 апреля 2020

Если он возвращает None, то это потому, что состояние == 'FAILED'. Вам необходимо выяснить причину сбоя, которая может быть в «StateChangeReason».

{
    'QueryExecution': {
        'QueryExecutionId': 'string',
        'Query': 'string',
        'StatementType': 'DDL'|'DML'|'UTILITY',
        'ResultConfiguration': {
            'OutputLocation': 'string',
            'EncryptionConfiguration': {
                'EncryptionOption': 'SSE_S3'|'SSE_KMS'|'CSE_KMS',
                'KmsKey': 'string'
            }
        },
        'QueryExecutionContext': {
            'Database': 'string'
        },
        'Status': {
            'State': 'QUEUED'|'RUNNING'|'SUCCEEDED'|'FAILED'|'CANCELLED',
            'StateChangeReason': 'string',
            'SubmissionDateTime': datetime(2015, 1, 1),
            'CompletionDateTime': datetime(2015, 1, 1)
        },
        'Statistics': {
            'EngineExecutionTimeInMillis': 123,
            'DataScannedInBytes': 123,
            'DataManifestLocation': 'string',
            'TotalExecutionTimeInMillis': 123,
            'QueryQueueTimeInMillis': 123,
            'QueryPlanningTimeInMillis': 123,
            'ServiceProcessingTimeInMillis': 123
        },
        'WorkGroup': 'string'
    }
}
...