Я пишу pytest для моей лямбды, которая использует AThena, создал pytest.fixture, используя следующий код:
@pytest.fixture()
def moto_boto_athena():
@mock_athena
def boto_resource_athena():
res = boto3.client('athena')
return res
return boto_resource_athena
Моя лямбда имеет код ниже:
response = get_athena_client().start_query_execution(
QueryString=query_string,
QueryExecutionContext={
'Database': athena_database
},
ResultConfiguration={
'OutputLocation': athena_output_bucket,
}
)
query_status = None
#Check the status of QueryExecution
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
query_status = get_athena_client(). \
get_query_execution(QueryExecutionId=response['QueryExecutionId'])['QueryExecution']['Status']['State']
if query_status == 'FAILED' or query_status == 'CANCELLED':
raise Exception
Но как как только я начну выполнение теста, получаю исключение:
raise NotImplementedError(
> "The {0} action has not been implemented".format(action)
)
E NotImplementedError: The start_query_execution action has not been implemented
..\..\..\..\..\AppData\Roaming\Python\Python36\site-packages\moto\core\responses.py:393: NotImplementedError
Пожалуйста, руководство.