Как смоделировать функциональность модуля boto3 с помощью pytest - PullRequest
0 голосов
/ 18 марта 2020

У меня есть собственный модуль с именем sqs.py. Сценарий выполняет следующие действия:

  1. Получение сообщения от AWS SQS
  2. Получение пути AWS S3 для удаления
  3. Удаление пути
  4. Отправить электронное письмо с подтверждением пользователю

Я пытаюсь написать модульные тесты для этого модуля, которые проверят, что код будет выполняться должным образом, и вызовет исключения, когда они произойдут. .

Это означает, что мне нужно будет высмеивать ответ от вызовов Boto3, которые я делаю. Моя проблема в том, что код сначала устанавливает sh клиента SQS для получения сообщения, а затем второй вызов устанавливает sh клиента S3. Я не уверен, как смоделировать эти 2 независимых вызова и подделать ответ, чтобы я мог проверить функциональность моего сценария. Возможно, мой подход неверен. В любом случае, любые советы о том, как сделать это правильно, приветствуются.

Вот как выглядит код:

import boto3
import json
import os
import pprint
import time
import asyncio
import logging
from send_email import send_email

queue_url = 'https://xxxx.queue.amazonaws.com/1234567890/queue'

def shutdown(message):
    """ Sends shutdown command to OS """
    os.system(f'shutdown +5 "{message}"')

def send_failure_email(email_config: dict, error_message: str):
    """ Sends email notification to user with error message attached. """
    recipient_name = email_config['recipient_name']
    email_config['subject'] = 'Subject: Restore Failed'
    email_config['message'] = f'Hello {recipient_name},\n\n' \
                           + 'We regret that an error has occurred during the restore process. ' \
                           + 'Please try again in a few minutes.\n\n' \
                           + f'Error: {error_message}.\n\n' \
    try:
        send_email(email_config)
    except RuntimeError as error_message:
        logging.error(f'ERROR: cannot send email to user. {error_message}')

async def restore_s3_objects(s3_client: object, p_bucket_name: str, p_prefix: str):
    """Attempts to restore objects specified by p_bucket_name and p_prefix.

    Returns True if restore took place, false otherwise.
    """

    is_truncated = True
    key_marker = None
    key = ''
    number_of_items_restored = 0
    has_restore_occured = False
    logging.info(f'performing restore for {p_bucket_name}/{p_prefix}')
    try:
        while is_truncated == True:
            if not key_marker:
                version_list = s3_client.list_object_versions(
                    Bucket = p_bucket_name,
                    Prefix = p_prefix)
            else:
                version_list = s3_client.list_object_versions(
                    Bucket = p_bucket_name,
                    Prefix = p_prefix,
                    KeyMarker = key_marker)

            if 'DeleteMarkers' in version_list:
                logging.info('found delete markers')
                delete_markers = version_list['DeleteMarkers']
                for d in delete_markers:
                    if d['IsLatest'] == True:
                        key = d['Key']
                        version_id = d['VersionId']

                        s3_client.delete_object(
                            Bucket = p_bucket_name,
                            Key = key,
                            VersionId = version_id
                        )
                        number_of_items_restored = number_of_items_restored + 1

            is_truncated = version_list['IsTruncated']
            logging.info(f'is_truncated: {is_truncated}')

            if 'NextKeyMarker' in version_list:
                key_marker = version_list['NextKeyMarker']

        if number_of_items_restored > 0:
            has_restore_occured = True

        return has_restore_occured

    except Exception as error_message:
        raise RuntimeError(error_message)

async def main():
    if 'AWS_ACCESS_KEY_ID' in os.environ \
            and 'AWS_SECRET_ACCESS_KEY' in os.environ \
            and os.environ['AWS_ACCESS_KEY_ID'] != '' \
            and os.environ['AWS_SECRET_ACCESS_KEY'] != '':
                sqs_client = boto3.client(
                    'sqs',
                    aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
                    aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
                    verify=False
                )
                s3_client = boto3.client(
                    's3',
                    aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
                    aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
                    verify=False
                )
    else:
        sqs_client = boto3.client(
            'sqs',
            verify=False,
        )
        s3_client = boto3.client(
            's3',
            verify=False,
        )

    received_message = sqs_client.receive_message(
        QueueUrl=queue_url,
        AttributeNames=['All'],
        VisibilityTimeout=10,
        WaitTimeSeconds=20,    # Wait up to 20 seconds for a message to arrive
    )

    if 'Messages' in received_message \
        and len(received_message['Messages']) > 0:
        # NOTE: Initialize email configuration
        receipient_email = 'support@example.com'
        username = receipient_email.split('@')[0]
        fullname_length = len(username.split('.'))
        fullname = f"{username.split('.')[0]}" # Group name / First name only

        if (fullname_length == 2): # First name and last name available
            fullname = f"{username.split('.')[0]} {username.split('.')[1]}"

        fullname = fullname.title()

        email_config = {
            'destination': receipient_email,
            'recipient_name': fullname,
            'subject': 'Subject: Restore Complete',
            'message': ''
        }

        try:
            receipt_handle = received_message['Messages'][0]['ReceiptHandle']
        except Exception as error_message:
            logging.error(error_message)
            send_failure_email(email_config, error_message)
            shutdown(f'{error_message}')

        try:
            data = received_message['Messages'][0]['Body']
            data = json.loads(data)
            logging.info('A SQS message for a restore has been received.')
        except Exception as error_message:
            message = f'Unable to obtain and parse message body. {error_message}'
            logging.error(message)
            send_failure_email(email_config, message)
            shutdown(f'{error_message}')

        try:
            bucket = data['bucket']
            prefix = data['prefix']
        except Exception as error_message:
            message = f'Retrieving bucket name and prefix failed. {error_message}'
            logging.error(message)
            send_failure_email(email_config, message)
            shutdown(f'{error_message}')

        try:
            logging.info(f'Initiating restore for path: {bucket}/{prefix}')
            restore_was_performed = await asyncio.create_task(restore_s3_objects(s3_client, bucket, prefix))

            if restore_was_performed is True:
                email_config['message'] = f'Hello {fullname},\n\n' \
                                    + f'The files in the path \'{bucket}/{prefix}\' have been restored. ' \

                send_email(email_config)
                logging.info('Restore complete. Shutting down.')
            else:
                logging.info('Path does not require restore. Shutting down.')
            shutdown(f'shutdown +5 "Restore successful! System will shutdown in 5 mins"')

        except Exception as error_message:
            message = f'File restoration failed. {error_message}'
            logging.error(message)
            send_failure_email(email_config, message)
            shutdown(f'{error_message}')

        try:
            sqs_client.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=receipt_handle,
            )
        except Exception as error_message:
            message = f'Deleting restore session from SQS failed. {error_message}'
            logging.error(message)
            send_failure_email(email_config, message)
            shutdown(f'{error_message}')

if __name__ == '__main__':
    logging.basicConfig(filename='restore.log',level=logging.INFO)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

1 Ответ

0 голосов
/ 19 марта 2020

Единственный способ, которым я смог издеваться над Boto3, - это перестроить небольшой класс, который представляет фактическую структуру метода. Это связано с тем, что Boto3 использует динамические c методы, а все методы уровня ресурсов создаются во время выполнения.

Возможно, это не является отраслевым стандартом, но я не смог получить ни один из методов, найденных мной в Интернете. rnet, чтобы работать большую часть времени, и это работало довольно хорошо для меня и требует минимальных усилий (по сравнению с некоторыми решениями, которые я нашел).

class MockClient:
    def __init__(self, region_name, aws_access_key_id, aws_secret_access_key):
        self.region_name = region_name
        self.aws_access_key_id = aws_access_key_id
        self.aws_secret_access_key = aws_secret_access_key
        self.MockS3 = MockS3()

    def client(self, service_name, **kwargs):
        return self.MockS3


class MockS3:
    def __init__(self):
        self.response = None # Test your mock data from S3 here

    def list_object_versions(self, **kwargs):
        return self.response


class S3TestCase(unittest.TestCase):
    def test_restore_s3_objects(self):

        # Given
        bucket = "testBucket" # Test this to something that somewahat realistic
        prefix = "some/prefix" # Test this to something that somewahat realistic
        env_vars = mock.patch.dict(os.environ, {"AWS_ACCESS_KEY_ID": "abc",
                                                "AWS_SECRET_ACCESS_KEY": "def"})
        env_vars.start()

        # initialising the Session can be tricy since it has to be imported from 
        # the module/file that creates the session on actual code rather than 
        # where's a Session code is. In this case you might have to import from
        # main rather than boto3.
        boto3.session.Session = mock.Mock(side_effect=[
            MockClient(region_name='eu-west-1',
                       aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
                       aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'])])
        s3_client = boto3.client('s3', verify=False)

        # When
        has_restore_occured  = restore_s3_objects(s3_client, bucket, prefix)

        # Then
        self.assertEqual(has_restore_occured, False) # your expected result set
        env_vars.stop()
...