Я пытаюсь закодировать асинхронный вызов API, чтобы использовать athena для запуска действия - start_query_execution .Я не использую boto3 или любую другую библиотеку.
Буду признателен за любую поддержку этого запроса.
Я попытался изменить следующий код на Мэтью Маркус ,который использует эквивалент lambda.invoke, используя то, что я хочу сделать.Однако мне не удалось преобразовать код в экземпляр athena.start_query_execution.
ОБРАЗЕЦ МОЕГО ТЕКУЩЕГО КОДА:
request_parameters = '{'
request_parameters += 'QueryString: "SELECT * FROM <table_name> LIMIT 1;",'
request_parameters += 'ResultConfiguration: {"OutputLocation": "outputlocation"}'
request_parameters += '}'
creds = Session().get_credentials()
ATHENA_ENDPOINT_BASE = 'https://athena.{region}.amazonaws.com/'
def create_signed_headers(url, request_parameters):
host_segments = urlparse(url).netloc.split('.')
print('6. host_segments: ' , host_segments)
service = host_segments[0]
print('7. service: ' , service)
region = host_segments[1]
print('8. region: ' , region)
request = AWSRequest(method ='POST',
url = url,
data=request_parameters)
print('9. request: ', request)
SigV4Auth(creds, service, region).add_auth(request)
return dict(request.headers.items())
#query-string and result-configuration[output location] are *REQUIRED for start query execution
request_parameters
async def start_query_execution(url, request_parameters ,session):
signed_headers = create_signed_headers(url, request_parameters)
print('10. signed_headers: ' , signed_headers)
async with session.post(url,
json = request_parameters,
headers = signed_headers) as response:
return await response.json()
def generate_queries(name_and_request, base_url, session):
for name, request_parameters in name_and_request:
url = join(base_url, name, 'queries')
print('5. url: ' , url)
yield start_query_execution(url, request_parameters, session)
def start_all(name_and_request, region = 'eu-west-1'):
base_url = ATHENA_ENDPOINT_BASE.format(region = region)
print('3. base_url: ' , base_url)
async def wrapped():
async with ClientSession(raise_for_status=True) as session:
queries = generate_queries(name_and_request,
base_url,
session)
print('4. queries: ', queries)
return await asyncio.gather(*queries)
return asyncio.get_event_loop().run_until_complete(wrapped())
def main():
name = 'hello-world-{}'
name_and_request = ((name.format(i), dict(hello=i)) for i in range(10))
print('1. name_and_request: ' , name_and_request)
athena_responses = start_all(name_and_request)
print('2. athena_responses: ', athena_responses)
if __name__ == '__main__':
main()