Я использую следующий класс для проверки объектов, существующих в s3:
class HistoryManagerS3():
def __init__(self, bucket_name):
client = boto3.Session(profile_name='my-custom-profile')
self.s3 = client.resource('s3')
self.s3_bucket = self.s3.Bucket(bucket_name)
if self.s3_bucket.creation_date:
print("Successfully connected to bucket {}".format(bucket_name))
else:
raise Exception("bucket {} doesnt exist or there are no permissions".format(bucket_name))
def check_if_object_exist(self, obj: dict) -> bool:
full_path_in_s3 = "{}/{}/{}".format(obj['vendor'], date.today(), obj['file_name'])
try:
self.s3_bucket.Object("{}".format(full_path_in_s3)).get()
except botocore.exceptions.ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
return False
return True
Я очень часто использую check_if_object_exist, поэтому я решил открыть одно соединение с s3 и работать с этим соединением. Когда я запускаю свои тесты юнитов в этом классе, и он заканчивается (успешно), я вижу следующее предупреждение:
C:\Program Files\Python37\lib\unittest\case.py:628: ResourceWarning: unclosed <ssl.SSLSocket fd=944, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('ip', 54602), raddr=('ip', 443)>
Должен ли я создавать boto3.Session каждый раз, когда я вызываю забаву c?
ПРИМЕЧАНИЕ: я заметил в своих тестах, что для каждого вызова для check_if_object_exist fun c я получаю это сообщение.