Я написал лямбда-скрипт для управления жизненным циклом образов машин Amazon с использованием python и boto3. Скрипт работает нормально, но когда я понял, что должен написать для него модульные тесты, начался мой кошмар. Я не являюсь разработчиком, и я привык писать сценарии как SysAdmin.
Я уже создал модульные тесты для функций с возвращаемым состоянием, как показано ниже, и я отлично работаю.
def get_interface_wrapper(region, service, interface_type):
interface_types = ['client', 'resource']
interface = None
if (type(region) == str) and (type(service) == str) and (type(interface_type) == str) and (interface_type in interface_types):
interface = ("boto3." + interface_type +
"(" + "service_name=service," + "region_name=region)")
return interface
def get_interface(region, service, interface_type):
return eval(get_interface_wrapper(region, service, interface_type))
#Unit tests
def test_get_interface_client(self):
service = 'ec2'
interface_expression = 'boto3.client(service_name=service,region_name=region)'
client_interface = get_interface_wrapper(
self.region, service, 'client')
self.assertEqual(client_interface, interface_expression)
def test_get_interface_resource(self):
service = 'ec2'
interface_expression = 'boto3.resource(service_name=service,region_name=region)'
resource_interface = get_interface_wrapper(
self.region, service, 'resource')
self.assertEqual(resource_interface, interface_expression)
Однако для следующих функций, которые не имеют оператора возврата и полагаются на конечную точку AWS, я изо всех сил стараюсь обернуть ее вокруг. Как я могу смоделировать конечную точку или как я могу изменить свой код, чтобы создать модульный тест, который не зависит от конечных точек AWS.
def update_states(actions, ec2_client, logs_client, log_group, log_stream, dryrun_enabled=True):
for action in actions:
action.update({'phase': 'planning', 'PlanningTime': datetime.utcnow(
).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'})
put_log_events(logs_client, log_group, log_stream, [action])
# The tag packer_ami_state_tagging_date is not set
if (action['is_timestamp_present'] == True):
if (action['action'] == 'update'):
# The tag packer_ami_state_tagging_date is set, so update the state and tagging date
try:
ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[{'Key': 'packer_ami_state', 'Value': action['new_packer_ami_state']},
{'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI state and tagging date was updated'}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI state and tagging date was not updated', 'Error': e.args[0], }]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
if (action['action'] == 'delete'):
image = ec2_client.Image(action['ImageId'])
snapshots = []
for blockDevMapping in image.block_device_mappings:
if 'Ebs' in blockDevMapping:
snapshots.append(blockDevMapping['Ebs']['SnapshotId'])
try:
image.deregister(DryRun=dryrun_enabled)
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI was deregistered'}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI was not deregistered', 'Error': e.args[0], }]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
counter = 1
for snapshotID in snapshots:
snapshot = ec2_client.Snapshot(snapshotID)
try:
snapshot.delete(DryRun=dryrun_enabled)
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'SnapShot deleted', 'SnapShotID': snapshotID}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'SnapShot not deleted', 'Error': e.args[0], 'SnapShotID': snapshotID}, ]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
counter += 1
if (action['action'] == 'none'):
action.update(
{'OperationDate': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'OperationResult': 'No action'})
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'No action'}, ]
put_log_events(logs_client, log_group,
log_stream, operation_result)
else:
try:
ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[
{'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'Tag created'}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'Tag not created', 'Error': e.args[0], }]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
def put_log_events(client, log_group_name, log_stream_name, log_events):
log_stream = client.describe_log_streams(
logGroupName=log_group_name,
logStreamNamePrefix=log_stream_name
)
if (bool(log_stream['logStreams'])) and ('uploadSequenceToken' in log_stream['logStreams'][0]):
response = {
'nextSequenceToken': log_stream['logStreams'][0]['uploadSequenceToken']}
else:
response = {}
for log_event in log_events:
if bool(response):
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{
'timestamp': int(round(time.time() * 1000)),
'message': json.dumps(log_event)
},
],
sequenceToken=response['nextSequenceToken']
)
else:
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{
'timestamp': int(round(time.time() * 1000)),
'message': json.dumps(log_event)
},
],
)