Concurrent.futures в реализации лямбда-функции AWS - PullRequest
0 голосов
/ 26 сентября 2019

Я использую функции AWS Lambda Python для копирования снимков EBS / RDS в другой регион для аварийного восстановления.Проблема, с которой я столкнулся, это ограничение на копирование 5 снимков одновременно.Если я попытался скопировать более 5 одновременно, я получу ошибку:

botocore.exceptions.ClientError: An error occurred (ResourceLimitExceeded) when calling the CopySnapshot operation: Too many snapshot copies in progress. The limit is 5 for this destination region. 

Чтобы избежать этого, я добавляю функцию официанта, которая проверяет состояние снимка в регионе назначения и послеснимок завершен, он продолжает цикл.Он работает хорошо, но в этом случае он копирует только один снимок за раз.Вопрос в том, как реализовать модуль concurrent.futures для параллельной задачи, которая будет копировать 5 снимков одновременно?

    waiter = client_ec2_dst.get_waiter('snapshot_completed')
    message = ""

    for i in ec2_snapshots_src:
            snapshot_tags_filtered = ([item for item in i["Tags"] if item['Key'] != 'aws:backup:source-resource'])
            snapshot_tags_filtered.append({'Key': 'delete_On', 'Value': delete_on})
            snapshot_tags_filtered.append({'Key': 'src_Id', 'Value': i["SnapshotId"]})
            try:
                response = client_ec2_dst.copy_snapshot(
                    Description='[Disaster Recovery] copied from us-east-1',
                    SourceRegion=region_src,
                    SourceSnapshotId=i["SnapshotId"],
                    DryRun=False,
                #           Encrypted=True,
                #           KmsKeyId='1e287363-89f6-4837-a619-b550ff28c211',
                )
                new_snapshot_id = response["SnapshotId"]
                waiter.wait(
                    SnapshotIds=[new_snapshot_id],
                    WaiterConfig={'Delay': 5, 'MaxAttempts': 120}
                )
                snapshot_src_name = ([dic['Value'] for dic in snapshot_tags_filtered if dic['Key'] == 'Name'])
                message += ("Started copying latest EBS snapshot: " + i["SnapshotId"] + " for EC2 instance: " + str(snapshot_src_name) + " from: " + region_src + " to: " + region_dst + " with new id: " + new_snapshot_id + ".\n")

                # Adding tags to snapshots in destination region
                tag_src = [new_snapshot_id]
                tag = client_ec2_dst.create_tags(
                    DryRun=False,
                    Resources=tag_src,
                    Tags=snapshot_tags_filtered
                )
            except Exception as e:
                raise e

1 Ответ

1 голос
/ 26 сентября 2019

Вы можете использовать одновременный исполнитель и параметр max_workers, чтобы ограничить количество одновременно выполняемых заданий.Как это:

import concurrent.futures

def copy_snapshot(snapshot_id):
    waiter = client_ec2_dst.get_waiter('snapshot_completed')
    response = client_ec2_dst.copy_snapshot(
        Description='[Disaster Recovery] copied from us-east-1',
        SourceRegion=region_src,
        SourceSnapshotId=snapshot_id,
        DryRun=False
    )
    new_snapshot_id = response["SnapshotId"]
    waiter.wait(
        SnapshotIds=[new_snapshot_id],
        WaiterConfig={'Delay': 5, 'MaxAttempts': 120}
    )

# Copy snapshots in parallel, but no more than 5 at a time:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [
        executor.submit(copy_snapshot, s['SnapshotId'])
        for s in ec2_snapshots_src]
    for future in futures:
        future.result()
...