Модульное тестирование лямбда-скрипта - PullRequest
3 голосов
/ 01 ноября 2019

Я написал лямбда-скрипт для управления жизненным циклом образов машин Amazon с использованием python и boto3. Скрипт работает нормально, но когда я понял, что должен написать для него модульные тесты, начался мой кошмар. Я не являюсь разработчиком, и я привык писать сценарии как SysAdmin.

Я уже создал модульные тесты для функций с возвращаемым состоянием, как показано ниже, и я отлично работаю.

def get_interface_wrapper(region, service, interface_type):
    interface_types = ['client', 'resource']
    interface = None

    if (type(region) == str) and (type(service) == str) and (type(interface_type) == str) and (interface_type in interface_types):
        interface = ("boto3." + interface_type +
                     "(" + "service_name=service," + "region_name=region)")

    return interface

def get_interface(region, service, interface_type):
    return eval(get_interface_wrapper(region, service, interface_type))

#Unit tests
def test_get_interface_client(self):

    service = 'ec2'
    interface_expression = 'boto3.client(service_name=service,region_name=region)'
    client_interface = get_interface_wrapper(
        self.region, service, 'client')
    self.assertEqual(client_interface, interface_expression)


def test_get_interface_resource(self):

    service = 'ec2'
    interface_expression = 'boto3.resource(service_name=service,region_name=region)'
    resource_interface = get_interface_wrapper(
        self.region, service, 'resource')
    self.assertEqual(resource_interface, interface_expression)

Однако для следующих функций, которые не имеют оператора возврата и полагаются на конечную точку AWS, я изо всех сил стараюсь обернуть ее вокруг. Как я могу смоделировать конечную точку или как я могу изменить свой код, чтобы создать модульный тест, который не зависит от конечных точек AWS.

def update_states(actions, ec2_client, logs_client, log_group, log_stream, dryrun_enabled=True):
    for action in actions:

        action.update({'phase': 'planning', 'PlanningTime': datetime.utcnow(
        ).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'})
        put_log_events(logs_client, log_group, log_stream, [action])

        # The tag packer_ami_state_tagging_date is not set
        if (action['is_timestamp_present'] == True):

            if (action['action'] == 'update'):
                # The tag packer_ami_state_tagging_date is set, so update the state and tagging date
                try:

                    ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[{'Key': 'packer_ami_state', 'Value': action['new_packer_ami_state']},
                                                                                                 {'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])

                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI state and tagging date was updated'}, ]

                except Exception as e:

                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI state and tagging date was not updated', 'Error': e.args[0], }]

                finally:
                    put_log_events(logs_client, log_group,
                                   log_stream, operation_result)

            if (action['action'] == 'delete'):
                image = ec2_client.Image(action['ImageId'])
                snapshots = []
                for blockDevMapping in image.block_device_mappings:
                    if 'Ebs' in blockDevMapping:
                        snapshots.append(blockDevMapping['Ebs']['SnapshotId'])

                try:
                    image.deregister(DryRun=dryrun_enabled)
                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI was deregistered'}, ]

                except Exception as e:
                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI was not deregistered', 'Error': e.args[0], }]

                finally:
                    put_log_events(logs_client, log_group,
                                   log_stream, operation_result)

                counter = 1
                for snapshotID in snapshots:
                    snapshot = ec2_client.Snapshot(snapshotID)

                    try:
                        snapshot.delete(DryRun=dryrun_enabled)
                        operation_result = [
                            {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'SnapShot deleted', 'SnapShotID': snapshotID}, ]

                    except Exception as e:
                        operation_result = [
                            {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'SnapShot not deleted', 'Error': e.args[0], 'SnapShotID': snapshotID}, ]

                    finally:
                        put_log_events(logs_client, log_group,
                                       log_stream, operation_result)

                    counter += 1

            if (action['action'] == 'none'):
                action.update(
                    {'OperationDate': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'OperationResult': 'No action'})

                operation_result = [
                    {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'No action'}, ]

                put_log_events(logs_client, log_group,
                               log_stream, operation_result)

        else:
            try:
                ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[
                    {'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])

                operation_result = [
                    {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'Tag created'}, ]

            except Exception as e:
                operation_result = [
                    {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'Tag not created', 'Error': e.args[0], }]

            finally:
                put_log_events(logs_client, log_group,
                               log_stream, operation_result)


def put_log_events(client, log_group_name, log_stream_name, log_events):
    log_stream = client.describe_log_streams(
        logGroupName=log_group_name,
        logStreamNamePrefix=log_stream_name
    )

    if (bool(log_stream['logStreams'])) and ('uploadSequenceToken' in log_stream['logStreams'][0]):
        response = {
            'nextSequenceToken': log_stream['logStreams'][0]['uploadSequenceToken']}
    else:
        response = {}

    for log_event in log_events:
        if bool(response):
            response = client.put_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                logEvents=[
                    {
                        'timestamp': int(round(time.time() * 1000)),
                        'message': json.dumps(log_event)
                    },
                ],
                sequenceToken=response['nextSequenceToken']
            )
        else:
            response = client.put_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                logEvents=[
                    {
                        'timestamp': int(round(time.time() * 1000)),
                        'message': json.dumps(log_event)
                    },
                ],
            )

1 Ответ

1 голос
/ 01 ноября 2019

Я рекомендую использовать исправления во встроенной библиотеке unittest.mock . Я использую это для макетирования всех вызовов boto3, чтобы никогда не использовать реальный сервис AWS. Вариантов много, но вот простой пример, который макетирует клиента.

Предположим, у вас есть код в модуле с именем "my_code", который импортирует boto3 и выполняет вызовы к "ssm" клиенту boto3 для get_parameters_by_pathфункция. Вы можете смоделировать это с помощью такого кода:

from unittest.mock import patch, MagicMock

...

@patch('my_app.my_code.boto3')
def test_secrets_load_ssm(self, mock_boto):
    mock_client = MagicMock()
    mock_boto.client.return_value = mock_client
    mock_client.get_parameters_by_path.return_value = helper_function()

    my_param = my_code.my_function_being_tested_that_fetches_a_parameter('/TEST_APP/CI/secure_string_test')

    self.assertEqual(my_param, 'secure string value')


def helper_function():
    return {'Parameters': [{'Name': '/TEST_APP/CI/secure_string_test',
                            'Type': 'SecureString',
                            'Value': 'secure string value',
                            'Version': 1,
                            'LastModifiedDate': datetime.datetime(2019, 8, 8, 14, 44, 26, 878000, tzinfo=datetime.timezone.utc),
                            'ARN': 'arn:aws:ssm:us-east-1:999478573200:parameter/TEST_APP/CI/secure_string_test'}], ResponseMetadata': {'RequestId': 'b9f016a4-485d-80d2-a504-015d081d8603',
                                 'HTTPStatusCode': 200,
                                 'HTTPHeaders': {'x-amzn-requestid': 'b9f016a4-475d-40d2-a504-015d981d8603',
                                                 'content-type': 'application/x-amz-json-1.1',
                                                 'content-length': '666',
                                                 'date': 'Fri, 30 Aug 2019 16:57:17 GMT'},
                                 'RetryAttempts': 0}
            }

Я поместил возвращаемое значение макета в отдельную вспомогательную функцию, поскольку это неСуть этого примера и быть тем JSON, который вам нужен, чтобы boto3 смоделировал как возврат. Если вы не знакомы с макетом и патчами юнит-тестов, вам придется немного увеличить его использование, но, сделав это сам, я могу засвидетельствовать, что это решит такие проблемы с юнит-тестами boto3 гораздо элегантнее.

Аннотация @patch позволяет заменять настоящую библиотеку boto3 созданными вами вызовами-макетами. В аннотации объявляется, какую импортируемую функцию вы исправляете, и для которой требуется соответствующая переменная в сигнатуре функции (mock_boto в этом примере). Следующая пара строк, которая устанавливает объект, который будет возвращен, когда код в функции, которую я тестирую, вызывает boto3.client (), а затем следующая строка устанавливает, что должно быть возвращено, когда код вызывает функцию get_parameters_by_path объекта клиента,В патчах есть такие функции, как assert_called_once на ваших макетах для проверки того, что функция была вызвана так, как ожидалось, поэтому, даже если функция ничего не возвращает, вы можете ее макетировать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...