Как смоделировать объект StreamingBody в boto3 для обработки с помощью BytesIO в Python? - PullRequest
0 голосов
/ 20 октября 2019

Я тестирую модуль, который преобразует элемент из объекта S3 в панду DataFrame, и мне нужно смоделировать возвращенный объект StreamingBody из boto3

file.py

def object_to_df(self, key_name, dtypes):
    s3_object = self.get_object(key_name=key_name)
    if s3_object is not None:
        object_df = pandas.read_csv(
            io.BytesIO(s3_object["Body"].read()), dtype=dtypes
        )
        return object_df

Ответ self.get_object (key_name) задокументирован здесь

{
    'Body': StreamingBody(),
    'DeleteMarker': True|False,
    'AcceptRanges': 'string',
    ...
}

Так что мне нужно смоделировать этот объект StreamingBody () и заставить мою функцию mock вернуть его.

test.py

import unittest
import pandas
from io import StringIO
from unittest.mock import patch, Mock
from path.to.file import custom_class
from botocore.response import StreamingBody

class TestS3Class(unittest.TestCase):
    """TestCase for path_to/file.py"""

    def setUp(self):
        """Creates an instance of the live class for testing"""
        self.s3_test_client = S3()


    @patch('path.to.class.get_object')
    def test_object_to_df(self, mock_get_object):
        """"""
        mock_response = {'Body': [{'Candidate': 'Black Panther', 'Votes': 3},
                        {'Candidate': 'Captain America: Civil War', 'Votes': 8},
                        {'Candidate': 'Guardians of the Galaxy', 'Votes': 8},
                        {'Candidate': "Thor: Ragnarok", 'Votes': 1}
                    ]}
        mock_stream = StreamingBody(StringIO(str(mock_response)), len(str(mock_response)))
        mock_get_object.return_value = mock_stream
        self.assertIsInstance(self.s3_test_client.object_to_df(key_name='key_name', dtypes=str), pandas.DataFrame)

Но я сталкиваюсь с TypeError: 'StreamingBody' object is not subscriptable

Есть подсказки?

...