Как заглушить S3Transfer в python - PullRequest
       61

Как заглушить S3Transfer в python

2 голосов
/ 01 февраля 2020

Используя botocore.stub и mock_s3 из moto, я могу заглушить клиента boto3 S3, как показано ниже,

from moto import mock_s3
from botocore.stub import Stubber

class TestS3(unittest.TestCase):
    @mock_s3
    def setUp(self):
        self.s3_client = boto3.client('s3', region_name='us-west-1')
        self.stubber = Stubber(client=self.s3_client)

Затем в тестовых функциях я могу использовать add_client_error и add_response, как показано ниже, и при необходимости добавьте утверждения,

@mock_s3
def test_put_object_exception(self):
    self.stubber.add_client_error('put_object', service_error_code='500')
    with self.stubber:
        self.s3_service.put_object(data='/some/dummy/path', key='/some/dummy/key')

Однако в моем классе S3 я использую S3Transfer upload_file для загрузки файлов в S3, есть ли способ смоделировать метод S3Transfer (self.s3_client) .upload_file?

1 Ответ

0 голосов
/ 09 апреля 2020

Я обнаружил, что заглушка put_object работает хорошо. В примере 'test_file' должен быть реальным файлом, так как S3Transfer проверяет, существует ли он. S3Transfer будет использовать другие методы обслуживания, если файл, который он передает, достаточно велик, чтобы оправдать загрузку, состоящую из нескольких частей, но его копирование начинает больше походить на написание тестов для S3Transfer, чем на тестирование моего собственного кода.

import botocore.session
from botocore.stub import Stubber, ANY
from boto3.s3.transfer import S3Transfer
s3 = botocore.session.get_session().create_client('s3')
stubber = Stubber(s3)

BUCKET='mock_bucket'
KEY='mock_key'                       
put_object_response = {   'ETag': '"14fe4f49fffffffffff9afbaaaaaaaa9"',
    'ResponseMetadata': {   'HTTPHeaders': {   'content-length': '0',
                                               'date': 'Wed, 08 Apr 2020 '
                                                       '20:35:42 GMT',
                                               'etag': '"14fe4f49fffffffffff9afbaaaaaaaa9"',
                                               'server': 'AmazonS3',
                                               },
                            'HTTPStatusCode': 200,
                            'HostId': 'GEHrJmjk76Ug/clCVUwimbmIjTTb2S4kU0lLg3Ylj8GKrAIsv5+S7AFb2cRkCLd+mpptmxfubLM=',
                            'RequestId': 'A8FFFFFFF84C3A77',
                            'RetryAttempts': 0},
    'VersionId': 'Dbc0gbLVEN4N5F4oz7Hhek0Xd82Mdgyo'}
stubber.add_response('put_object', put_object_response,expected_params= {'Body': ANY, 'Bucket':BUCKET,'Key':KEY})
stubber.activate()
transfer = S3Transfer(s3)
transfer.upload_file('test_file', BUCKET,KEY)
...