Я смог решить эту проблему, создав макет S3 Bucket, а затем использовал его в дальнейшем в тесте.Вот завершенный тест, который, я думаю, работает как задумано:
def test_register_extracts_by_location_s3(self):
"""
Testing that when the location is s3, all the extracts are registered and set to 'ready' status.
The process/extract relationship should also be set to 'ready' since that is the last status the process set
the extracts to.
:return:
"""
process_status = aliased(ExtractStatus)
extract_status = aliased(ExtractStatus)
test_bucket = "test_bucket"
expected_keys = ["test_local_dir_1.csv", "test_local_dir_2.csv"]
client = boto3.client(
"s3",
region_name="us-east-1",
aws_access_key_id="fake_access_key",
aws_secret_access_key="fake_secret_key",
)
try:
s3 = boto3.resource(
"s3",
region_name="us-east-1",
aws_access_key_id="fake_access_key",
aws_secret_access_key="fake_secret_key",
)
s3.meta.client.head_bucket(Bucket=test_bucket)
except botocore.exceptions.ClientError:
pass
else:
err = "%s should not exist" % test_bucket
raise EnvironmentError(err)
client.create_bucket(Bucket=test_bucket)
current_dir = os.path.dirname(__file__)
fixtures_dir = os.path.join(current_dir, "fixtures")
for file in expected_keys:
key = os.path.join(test_bucket, file)
print(file)
print(key)
print(fixtures_dir)
file = os.path.join(fixtures_dir, file)
client.upload_file(Filename=file, Bucket=test_bucket, Key=key)
self.process_tracker.register_extracts_by_location(
location_path="s3://test_bucket"
)
extracts = (
self.session.query(
Extract.extract_filename,
extract_status.extract_status_name,
process_status.extract_status_name,
)
.join(
ExtractProcess, Extract.extract_id == ExtractProcess.extract_tracking_id
)
.join(
extract_status,
Extract.extract_status_id == extract_status.extract_status_id,
)
.join(
process_status,
ExtractProcess.extract_process_status_id
== process_status.extract_status_id,
)
.filter(
ExtractProcess.process_tracking_id
== self.process_tracker.process_tracking_run.process_tracking_id
)
)
given_result = list()
for extract in extracts:
given_result.append(
[
extract.extract_filename,
extract.extract_status_name,
extract.extract_status_name,
]
)
expected_result = [
["test_bucket/test_local_dir_1.csv", "ready", "ready"],
["test_bucket/test_local_dir_2.csv", "ready", "ready"],
]
self.assertCountEqual(expected_result, given_result)