Я пишу модульный тест для функции python, которая выполняет update_item в Dynamodb через ресурс таблицы. Нормальный модульный тест работает, но я не могу вызвать ошибку ClientError.
Тесты выполняются, но я всегда: E Failed: DID NOT RAISE <class 'botocore.exceptions.ClientError'>
Я пытаюсь смоделировать случай, когда я получу внутреннюю ошибку сервера AWS или ошибку RequestLimitExceeded.
Спасибо, что указали мне правильное направление!
Мой функциональный код:
def _updateCardConfiguration(identifier, cardType, layout):
try:
table.update_item(
Key={
'identifier': identifier,
'cardType': cardType
},
UpdateExpression="SET layout = :layout",
ExpressionAttributeValues={
':layout': layout
}
)
except ClientError as e:
logger.fatal(
f"The update of the cardconfiguration for the niche: {identifier} has failed with the following message: {e.response['Error']['Message']}")
raise
return True
def lambda_handler(event, context):
# Check the cardtype
cardType = 'SearchresultCard' if (
event['detail']['action'] == 'updateSearchresultCard') else 'DetailCard'
# Update the card configuration
_updateCardConfiguration(
event['detail']['identifier'], cardType, event['detail']["layout"])
return True
Мой тестовый код:
@pytest.fixture(scope='function')
def cardConfigurationsTable(aws_credentials):
with mock_dynamodb2():
#client = boto3.client('dynamodb', region_name='eu-west-1')
dynamodb = boto3.resource('dynamodb')
dynamodb.create_table(
TableName='CardConfigurations',
KeySchema=[
{
'AttributeName': 'identifier',
'KeyType': 'HASH'
},
{
'AttributeName': 'cardType',
'KeyType': 'RANGE'
}
],
AttributeDefinitions=[
{
'AttributeName': 'identifier',
'AttributeType': 'S'
},
{
'AttributeName': 'cardType',
'AttributeType': 'S'
},
],
StreamSpecification={
'StreamEnabled': True,
'StreamViewType': 'NEW_AND_OLD_IMAGES'
},
BillingMode='PAY_PER_REQUEST',
SSESpecification={
'Enabled': True
},
GlobalSecondaryIndexes=[
{
'IndexName': 'byIdentifier',
'KeySchema': [
{
'AttributeName': 'identifier',
'KeyType': 'HASH'
},
{
'AttributeName': 'cardType',
'KeyType': 'RANGE'
}
],
'Projection': {
'ProjectionType': 'ALL'
}
}
])
yield dynamodb.Table('CardConfigurations')
def test_updateItem_raises_clientExecption(cardConfigurationsTable, env_vars):
""" Unit test for testing the lambda that updates the cardconfiguration
Scenario: No detail card is configured for the niche
:param fixture cardConfigurationsTable: The fixture that mocks the dynamodb CardConfigurations table
:param fixture env_vars: The fixture with the env vars
"""
# Prepare the event
with open('tests/unit/event-flows/dataconfiguration-state-transition/events/updateSearchresultCard.json') as json_file:
event = json.load(json_file)
stubber = Stubber(cardConfigurationsTable.meta.client)
stubber.add_client_error(
"update_item",
service_error_code="InternalServerError",
service_message="Internal Server Error",
http_status_code=500,
)
stubber.activate()
# Import the lambda handler
lambdaFunction = importlib.import_module(
"event-flows.dataconfiguration-state-transition.updateCardconfigurationInDb.handler")
with pytest.raises(ClientError) as e_info:
# Execute the handler
response = lambdaFunction.lambda_handler(event, {})