Настройка автоматического масштабирования EMR на основе тегов с помощью лямбда-функции - PullRequest
0 голосов
/ 14 июня 2019

У нас есть один главный файл JSON для автоматического масштабирования, который состоит из нескольких политик автоматического масштабирования , скажем, у нас есть блоки App A, App B и App C в файле JSON. Теперь нам бы хотелось иметь универсальная лямбда-функция Python , которую мы хотели бы развернуть во всех приложениях EMR, например, для приложений A, B и App C. Чтобы достичь этого, нам нужно прочитать теги лямбда-функции, используя код Pythonи код должен иметь:

  1. Использовать IF-операторы , но не уверен, как использовать lambda.tags

    для тегов в lambdafunction.tags:if tags ["emrappname"] == 'APPA': instancename = tags ["APPA"] возвращает имя экземпляра

(или)

Использовать Фильтры , но Boto3 не имеет описания для лямбда-функции

response = lambda.describe_instances(
    Filters=[
        {
            'Name': 'tag:'emrappname,
            'Values': 'AppA'
        }
    ]
)

Вот текущий пример кода, который требует обновления (прочитайтетеги лямбда, проверка тега APPA в функции лямбда и установка политики на APPA Block из главного файла JSON с автоматическим масштабированием.

from __future__ import print_function
import boto3
import json
import os
import sys
print('Loading function')
def lambda_handler(event, context):

    clusterId = event['clusterId']

    #read environment variables bucket name and config file
    s3BucketName = os.getenv('S3BucketName')
    configKey = os.getenv('ConfigKey')

    s3 = boto3.resource('s3')
    emrClient = boto3.client('emr')
    s3Client=boto3.client('s3')

    #Read Spark Submit Info from Config files
    configStream = s3.Object(s3BucketName, configKey).get()['Body']
    configContent = json.loads(configStream.read().decode('utf-8'))
    print(configContent)

    #Declare variables
    S3_BUCKET=configContent['s3']['bucket']
    S3_KEY = configContent['s3']['key']
    S3_File = configContent['s3']['file']
    EMR_LOC = "/home/hadoop"

    #fetch instanceId
    emrClient = boto3.client('emr')
    response = emrClient.describe_cluster(ClusterId=clusterId)
    print(response["Cluster"]["InstanceCollectionType"])

    instanceGroupResponse = emrClient.list_instance_groups(ClusterId=clusterId)

    instanceList = instanceGroupResponse["InstanceGroups"]

    for instance in instanceList:
        if (instance["Name"] == "Core"):
            instanceId=instance["Id"]
            print(instance)

    print("core instance group id is " + instanceId)

    S3_AS = 's3://{bucket}/{key}{file}'.format(bucket=S3_BUCKET, key=S3_KEY, file=S3_File )
    print(S3_AS)

    print("Copying files to emr")

    #Copy Autoscaling JSON file to EMR
    stepResponse = emrClient.add_job_flow_steps(
            JobFlowId=clusterId,
            Steps=[
                {
                    'Name': 'Copy autoscaling JSON to EMR',
                    'ActionOnFailure': 'CANCEL_AND_WAIT',
                    'HadoopJarStep': {
                        'Jar': 'command-runner.jar',
                        'Args': ['aws', 's3', 'cp', S3_AS, EMR_LOC]
                    }
                }
            ]    
        )

    #Setup Autoscaling policy
    #Challenge: We need to have multiple json files or is there a way to use different blocks of JSON through file:///

    stepResponse = emrClient.add_job_flow_steps(
            JobFlowId=clusterId,
            Steps=[
                {
                    'Name': 'Setup Autoscaling Policy',
                    'ActionOnFailure': 'CANCEL_AND_WAIT',
                    'HadoopJarStep': {
                        'Jar': 'command-runner.jar',
                        'Args': ['aws', 'emr', 'put-auto-scaling-policy','--cluster-id',clusterId, '--instance-group-id', instanceId,'--auto-scaling-policy', 'file:///home/hadoop/autoscaling.json']
                    }
                }
            ]    
        )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...