Имитация ошибки ClientError boto3 (botocore) для вызова update_item таблицы Dynamodb - PullRequest
0 голосов
/ 12 июля 2020

Я пишу модульный тест для функции python, которая выполняет update_item в Dynamodb через ресурс таблицы. Нормальный модульный тест работает, но я не могу вызвать ошибку ClientError.

Тесты выполняются, но я всегда: E Failed: DID NOT RAISE <class 'botocore.exceptions.ClientError'>

Я пытаюсь смоделировать случай, когда я получу внутреннюю ошибку сервера AWS или ошибку RequestLimitExceeded.

Спасибо, что указали мне правильное направление!

Мой функциональный код:

def _updateCardConfiguration(identifier, cardType, layout):
    try:
        table.update_item(
            Key={
                'identifier': identifier,
                'cardType': cardType
            },
            UpdateExpression="SET layout = :layout",
            ExpressionAttributeValues={
                ':layout': layout
            }
        )

    except ClientError as e:
        logger.fatal(
            f"The update of the cardconfiguration for the niche: {identifier} has failed with the following message: {e.response['Error']['Message']}")
        raise

    return True


def lambda_handler(event, context):
    # Check the cardtype
    cardType = 'SearchresultCard' if (
        event['detail']['action'] == 'updateSearchresultCard') else 'DetailCard'

    # Update the card configuration
    _updateCardConfiguration(
        event['detail']['identifier'], cardType, event['detail']["layout"])


    return True

Мой тестовый код:

@pytest.fixture(scope='function')
def cardConfigurationsTable(aws_credentials):
    with mock_dynamodb2():
        #client = boto3.client('dynamodb', region_name='eu-west-1')
        dynamodb = boto3.resource('dynamodb')
        dynamodb.create_table(
            TableName='CardConfigurations',
            KeySchema=[
                {
                    'AttributeName': 'identifier',
                    'KeyType': 'HASH'
                },
                {
                    'AttributeName': 'cardType',
                    'KeyType': 'RANGE'
                }
            ],
            AttributeDefinitions=[
                {
                    'AttributeName': 'identifier',
                    'AttributeType': 'S'
                },
                {
                    'AttributeName': 'cardType',
                    'AttributeType': 'S'
                },
            ],
            StreamSpecification={
                'StreamEnabled': True,
                'StreamViewType': 'NEW_AND_OLD_IMAGES'
            },
            BillingMode='PAY_PER_REQUEST',
            SSESpecification={
                'Enabled': True
            },
            GlobalSecondaryIndexes=[
                {
                    'IndexName': 'byIdentifier',
                    'KeySchema': [
                        {
                            'AttributeName': 'identifier',
                            'KeyType': 'HASH'
                        },
                        {
                            'AttributeName': 'cardType',
                            'KeyType': 'RANGE'
                        }
                    ],
                    'Projection': {
                        'ProjectionType': 'ALL'
                    }
                }
            ])

        yield dynamodb.Table('CardConfigurations')

def test_updateItem_raises_clientExecption(cardConfigurationsTable, env_vars):
    """ Unit test for testing the lambda that updates the cardconfiguration

    Scenario: No detail card is configured for the niche

      :param fixture cardConfigurationsTable: The fixture that mocks the dynamodb CardConfigurations table
      :param fixture env_vars: The fixture with the env vars
    """
    # Prepare the event
    with open('tests/unit/event-flows/dataconfiguration-state-transition/events/updateSearchresultCard.json') as json_file:
        event = json.load(json_file)

    stubber = Stubber(cardConfigurationsTable.meta.client)
    stubber.add_client_error(
        "update_item",
        service_error_code="InternalServerError",
        service_message="Internal Server Error",
        http_status_code=500,
    )
    stubber.activate()

    # Import the lambda handler
    lambdaFunction = importlib.import_module(
        "event-flows.dataconfiguration-state-transition.updateCardconfigurationInDb.handler")

    with pytest.raises(ClientError) as e_info:
        # Execute the handler
        response = lambdaFunction.lambda_handler(event, {})

...