У нас есть один главный файл JSON для автоматического масштабирования, который состоит из нескольких политик автоматического масштабирования , скажем, у нас есть блоки App A, App B и App C в файле JSON. Теперь нам бы хотелось иметь универсальная лямбда-функция Python , которую мы хотели бы развернуть во всех приложениях EMR, например, для приложений A, B и App C. Чтобы достичь этого, нам нужно прочитать теги лямбда-функции, используя код Pythonи код должен иметь:
Использовать IF-операторы , но не уверен, как использовать lambda.tags
для тегов в lambdafunction.tags:if tags ["emrappname"] == 'APPA': instancename = tags ["APPA"] возвращает имя экземпляра
(или)
Использовать Фильтры , но Boto3 не имеет описания для лямбда-функции
response = lambda.describe_instances(
Filters=[
{
'Name': 'tag:'emrappname,
'Values': 'AppA'
}
]
)
Вот текущий пример кода, который требует обновления (прочитайтетеги лямбда, проверка тега APPA в функции лямбда и установка политики на APPA Block из главного файла JSON с автоматическим масштабированием.
from __future__ import print_function
import boto3
import json
import os
import sys
print('Loading function')
def lambda_handler(event, context):
clusterId = event['clusterId']
#read environment variables bucket name and config file
s3BucketName = os.getenv('S3BucketName')
configKey = os.getenv('ConfigKey')
s3 = boto3.resource('s3')
emrClient = boto3.client('emr')
s3Client=boto3.client('s3')
#Read Spark Submit Info from Config files
configStream = s3.Object(s3BucketName, configKey).get()['Body']
configContent = json.loads(configStream.read().decode('utf-8'))
print(configContent)
#Declare variables
S3_BUCKET=configContent['s3']['bucket']
S3_KEY = configContent['s3']['key']
S3_File = configContent['s3']['file']
EMR_LOC = "/home/hadoop"
#fetch instanceId
emrClient = boto3.client('emr')
response = emrClient.describe_cluster(ClusterId=clusterId)
print(response["Cluster"]["InstanceCollectionType"])
instanceGroupResponse = emrClient.list_instance_groups(ClusterId=clusterId)
instanceList = instanceGroupResponse["InstanceGroups"]
for instance in instanceList:
if (instance["Name"] == "Core"):
instanceId=instance["Id"]
print(instance)
print("core instance group id is " + instanceId)
S3_AS = 's3://{bucket}/{key}{file}'.format(bucket=S3_BUCKET, key=S3_KEY, file=S3_File )
print(S3_AS)
print("Copying files to emr")
#Copy Autoscaling JSON file to EMR
stepResponse = emrClient.add_job_flow_steps(
JobFlowId=clusterId,
Steps=[
{
'Name': 'Copy autoscaling JSON to EMR',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 's3', 'cp', S3_AS, EMR_LOC]
}
}
]
)
#Setup Autoscaling policy
#Challenge: We need to have multiple json files or is there a way to use different blocks of JSON through file:///
stepResponse = emrClient.add_job_flow_steps(
JobFlowId=clusterId,
Steps=[
{
'Name': 'Setup Autoscaling Policy',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 'emr', 'put-auto-scaling-policy','--cluster-id',clusterId, '--instance-group-id', instanceId,'--auto-scaling-policy', 'file:///home/hadoop/autoscaling.json']
}
}
]
)