Как мне написать код автономного асинхронного вызова start_query_execution API-интерфейса Athena в Python - PullRequest
0 голосов
/ 07 марта 2019

Я пытаюсь закодировать асинхронный вызов API, чтобы использовать athena для запуска действия - start_query_execution .Я не использую boto3 или любую другую библиотеку.

Буду признателен за любую поддержку этого запроса.

Я попытался изменить следующий код на Мэтью Маркус ,который использует эквивалент lambda.invoke, используя то, что я хочу сделать.Однако мне не удалось преобразовать код в экземпляр athena.start_query_execution.

ОБРАЗЕЦ МОЕГО ТЕКУЩЕГО КОДА:

request_parameters = '{'
request_parameters += 'QueryString: "SELECT * FROM <table_name> LIMIT 1;",'
request_parameters += 'ResultConfiguration: {"OutputLocation": "outputlocation"}'
request_parameters += '}'

creds = Session().get_credentials()
ATHENA_ENDPOINT_BASE = 'https://athena.{region}.amazonaws.com/'

def create_signed_headers(url, request_parameters):
    host_segments = urlparse(url).netloc.split('.')
    print('6. host_segments: ' , host_segments)
    service = host_segments[0]
    print('7. service: ' , service)
    region = host_segments[1]
    print('8. region: ' , region)
    request = AWSRequest(method ='POST',
                         url = url,
                         data=request_parameters)
    print('9. request: ', request)
    SigV4Auth(creds, service, region).add_auth(request)
    return dict(request.headers.items())

#query-string and result-configuration[output location] are *REQUIRED for start query execution

request_parameters

async def start_query_execution(url, request_parameters ,session):
    signed_headers = create_signed_headers(url, request_parameters)
    print('10. signed_headers: ' , signed_headers)
    async with session.post(url,
                            json = request_parameters,
                            headers = signed_headers) as response:
        return await response.json()

def generate_queries(name_and_request, base_url, session):
    for name, request_parameters in name_and_request:
        url = join(base_url, name, 'queries')
        print('5. url: ' , url)
        yield start_query_execution(url, request_parameters, session)

def start_all(name_and_request, region = 'eu-west-1'):
    base_url = ATHENA_ENDPOINT_BASE.format(region = region)
    print('3. base_url: ' , base_url)
    async def wrapped():
        async with ClientSession(raise_for_status=True) as session:
                queries = generate_queries(name_and_request,
                                                   base_url,
                                                   session)
                print('4. queries: ', queries)
                return await asyncio.gather(*queries)

    return asyncio.get_event_loop().run_until_complete(wrapped())

def main():
    name = 'hello-world-{}'
    name_and_request = ((name.format(i), dict(hello=i)) for i in range(10))
    print('1. name_and_request: ' , name_and_request)

    athena_responses = start_all(name_and_request)
    print('2. athena_responses: ', athena_responses)

if __name__ == '__main__':
    main()
...