Я запускаю кластер EMR через Airflow и запускаю на нем задание PySpark. Я хочу использовать Custom AMI для загрузки кластера через Airflow. Я следую синтаксису boto3, найденному в онлайн-документах, но AMI не обнаруживается. Что-то не так с моим синтаксисом? Может ли кто-нибудь дать некоторые рекомендации относительно того, как приступить к проверке, где находится проблема?
(https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow)
- Ubuntu: 18.04,
- Python: 3.6.7,
- awscli: 1.16.166,
- поток воздуха: 1.10.0
- boto: 2.49.0,
- boto3: 1.9.156,
- botocore: 1.12.156
Я пытался создать кластер EMR с помощью cmd airflow в cli:
airflow test test_emr_custom_ami emr_create_cluster 2019-05-01
Кластер создан, но без указанного CustomAmiId.
from airflow.models import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.utils.trigger_rule import TriggerRule
from ast import literal_eval
from datetime import timedelta
"""
DAG to test creating an EMR cluster via Airflow
while specifying a custom AMI.
The DAG only creates and terminates an EMR cluster
in the region of aws_default airflow connection
"""
##########################################################################
##
# EMR/SPARK SETTINGS
##
##########################################################################
execution_dt = "{{ ds }}"
emr_template_args = {
'cluster_name': 'test__custom_ami_emr_{}'.format(execution_dt),
'log_uri': 's3://xxxxxx/test_custom_ami_emr/logs',
'release_label': 'emr-5.16.0',
# EMR instance fleets
'master_fleet_instances': "\
{'InstanceType': 'r4.xlarge',\
'BidPriceAsPercentageOfOnDemandPrice':30,\
'WeightedCapacity': 1\
},\
{'InstanceType': 'r4.2xlarge',\
'BidPriceAsPercentageOfOnDemandPrice': 30,\
'WeightedCapacity': 1},\
{'InstanceType': 'r4.4xlarge',\
'BidPriceAsPercentageOfOnDemandPrice': 30,\
'WeightedCapacity': 1}\
",\
'core_fleet_instances': "\
{'InstanceType': 'r4.xlarge',\
'BidPriceAsPercentageOfOnDemandPrice': 30,\
'WeightedCapacity':1},\
{'InstanceType': 'r4.2xlarge',\
'BidPriceAsPercentageOfOnDemandPrice': 30,\
'WeightedCapacity': 2},\
{'InstanceType': 'r4.4xlarge',\
'BidPriceAsPercentageOfOnDemandPrice': 30,\
'WeightedCapacity': 4}\
",\
'timeout_fleet_minutes': 20,\
'core_target_spot_capacity': 11,
'master_target_spot_capacity': 1,
'emr_ec2_subnet_ids': 'subnet-xxx, subnet-xxx, subnet-xxx',
'action_on_failure': 'TERMINATE_CLUSTER',
'availability_zones': 'eu-west-1a, eu-west-1b, eu-west-1c',
'ec2_keyname': 'xxx',
'visible_to_all_users': True,
'keeps_job_alive_when_no_steps': True,
# Applications to install/run on cluster. Pass as list
'applications': {'Hadoop', 'Hue', 'Spark', 'Hive', 'Ganglia'},
# Tags
'project': 'test_custom_ami_emr',
'owner': 'data_management',
'data_classification': 'confidential',
'environment': 'dev'
}
# Glue metastore configuration
metastore_configs = [
{
"Classification": "hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
},
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
]
def get_tags(args):
return [
{
'Key': 'Name',
'Value': args['cluster_name'],
},
{
'Key': 'cost_center',
'Value': 'xxx',
},
{
'Key': 'application',
'Value': 'spark',
},
{
'Key': 'project',
'Value': args['project'],
},
{
'Key': 'owner',
'Value': args['owner'],
},
{
'Key': 'data_classification',
'Value': args['data_classification'],
},
{
'Key': 'environment',
'Value': args['environment'],
}
]
def get_emr_fleet_instance_template(args):
"""
Get the template necessary to spin up an EMR Cluster using INSTANCE FLEETS.
Pass the key values below in a dictionary.
"""
emr_apps = [{'Name': app} for app in args['applications']]
template = {
'Name': args['cluster_name'],
'LogUri': args['log_uri'],
'ReleaseLabel': args['release_label'],
'RequestedEc2AvailabilityZones ': args['availability_zones'].replace(' ', '').split(','),
'Instances': {
'InstanceFleets': [
{
"Name": "MasterFleet",
"InstanceFleetType": "MASTER",
"TargetSpotCapacity": args['master_target_spot_capacity'],
"InstanceTypeConfigs": convert_string_var_to_dict(args['master_fleet_instances'])
},
{
"Name": "CoreFleet",
"InstanceFleetType": "CORE",
"TargetSpotCapacity": args['core_target_spot_capacity'],
"LaunchSpecifications": {
"SpotSpecification": {
"TimeoutDurationMinutes": args['timeout_fleet_minutes'],
"TimeoutAction": "SWITCH_TO_ON_DEMAND"
}
},
"InstanceTypeConfigs": convert_string_var_to_dict(args['core_fleet_instances'])
}
],
'Ec2KeyName': args['ec2_keyname'],
'Ec2SubnetIds': args['emr_ec2_subnet_ids'].replace(' ', '').split(','),
'KeepJobFlowAliveWhenNoSteps': True if args['keeps_job_alive_when_no_steps'] is None else args['keeps_job_alive_when_no_steps'],
'TerminationProtected': False,
},
'VisibleToAllUsers': args['visible_to_all_users'],
'JobFlowRole': 'xxx',
'ServiceRole': 'xxx',
'Applications': emr_apps,
'Tags': get_tags(args),
'Steps': [
{
'Name': 'Setup Debugging',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['state-pusher-script']
}
},
],
'Configurations': metastore_configs,
'CustomAmiId': 'ami-xxx'
}
return template
def convert_string_var_to_dict(text):
"""
Function that transforms a string that has the format of a JSON or more precisely a List of Dictionaries
to a real list of dictionaries.
:param text: The format of the string is as a list of Dictionaries
:type text: string
:return: List of Dictionaries
:type return: List
"""
output_list = []
for item in text.split('},'):
for ch in ['\r', '\n', ' ']:
if ch in text:
item = item.replace(ch, "")
if item.endswith('}'):
output_list.append(literal_eval(item))
else:
output_list.append(literal_eval(item + "}"))
return output_list
JOB_FLOW_OVERRIDES = get_emr_fleet_instance_template(emr_template_args)
dag = DAG(
"test_emr_custom_ami",
default_args={
"owner": "airflow",
"depends_on_past": False,
"email": ["xxx@xxx"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
"start_date": "2019-04-01"
},
schedule_interval="@daily",
catchup=False
)
# Create cluster
t_emr_create_cluster = EmrCreateJobFlowOperator(
task_id='emr_create_cluster',
job_flow_overrides=JOB_FLOW_OVERRIDES, # define custom AMI here
aws_conn_id='aws_default',
emr_conn_id='aws_default',
queue='light',
dag=dag
)
# Terminate cluster
t_emr_terminate_cluster = EmrTerminateJobFlowOperator(
task_id='emr_terminate_cluster',
job_flow_id="{{ task_instance.xcom_pull('emr_create_cluster', key='return_value') }}",
aws_conn_id='aws_default',
queue='light',
trigger_rule=TriggerRule.ALL_DONE,
dag=dag
)
t_emr_create_cluster >> t_emr_terminate_cluster
Я ожидаю, что созданный кластер EMR будет иметь собственный AMI, но это не так. Я проверяю это, просматривая «Сведения о конфигурации -> Пользовательский идентификатор AMI:» в консоли AWS при выборе созданного кластера.