Как выбрать собственный AMI для EMR через Airflow - PullRequest
1 голос
/ 29 мая 2019

Я запускаю кластер EMR через Airflow и запускаю на нем задание PySpark. Я хочу использовать Custom AMI для загрузки кластера через Airflow. Я следую синтаксису boto3, найденному в онлайн-документах, но AMI не обнаруживается. Что-то не так с моим синтаксисом? Может ли кто-нибудь дать некоторые рекомендации относительно того, как приступить к проверке, где находится проблема?

(https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow)

  • Ubuntu: 18.04,
  • Python: 3.6.7,
  • awscli: 1.16.166,
  • поток воздуха: 1.10.0
  • boto: 2.49.0,
  • boto3: 1.9.156,
  • botocore: 1.12.156

Я пытался создать кластер EMR с помощью cmd airflow в cli:

airflow test test_emr_custom_ami emr_create_cluster 2019-05-01

Кластер создан, но без указанного CustomAmiId.

from airflow.models import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.utils.trigger_rule import TriggerRule
from ast import literal_eval
from datetime import timedelta
"""
   DAG to test creating an EMR cluster via Airflow 
   while specifying a custom AMI.

   The DAG only creates and terminates an EMR cluster
   in the region of aws_default airflow connection
"""

##########################################################################
##
#   EMR/SPARK SETTINGS
##
##########################################################################
execution_dt = "{{ ds }}"
emr_template_args = {
    'cluster_name': 'test__custom_ami_emr_{}'.format(execution_dt),
    'log_uri': 's3://xxxxxx/test_custom_ami_emr/logs',
    'release_label': 'emr-5.16.0',
    # EMR instance fleets
    'master_fleet_instances': "\
                  {'InstanceType': 'r4.xlarge',\
                  'BidPriceAsPercentageOfOnDemandPrice':30,\
                  'WeightedCapacity': 1\
                   },\
                  {'InstanceType': 'r4.2xlarge',\
                  'BidPriceAsPercentageOfOnDemandPrice': 30,\
                  'WeightedCapacity': 1},\
                  {'InstanceType': 'r4.4xlarge',\
                   'BidPriceAsPercentageOfOnDemandPrice': 30,\
                   'WeightedCapacity': 1}\
    ",\
    'core_fleet_instances': "\
                   {'InstanceType': 'r4.xlarge',\
                    'BidPriceAsPercentageOfOnDemandPrice': 30,\
                    'WeightedCapacity':1},\
                    {'InstanceType': 'r4.2xlarge',\
                    'BidPriceAsPercentageOfOnDemandPrice': 30,\
                    'WeightedCapacity': 2},\
                    {'InstanceType': 'r4.4xlarge',\
                    'BidPriceAsPercentageOfOnDemandPrice': 30,\
                    'WeightedCapacity': 4}\
    ",\
    'timeout_fleet_minutes': 20,\
    'core_target_spot_capacity': 11,
    'master_target_spot_capacity': 1,
    'emr_ec2_subnet_ids': 'subnet-xxx, subnet-xxx, subnet-xxx',
    'action_on_failure': 'TERMINATE_CLUSTER',
    'availability_zones': 'eu-west-1a, eu-west-1b, eu-west-1c',

    'ec2_keyname': 'xxx',
    'visible_to_all_users': True,
    'keeps_job_alive_when_no_steps': True,

    # Applications to install/run on cluster. Pass as list
    'applications': {'Hadoop', 'Hue', 'Spark', 'Hive', 'Ganglia'},

    # Tags
    'project': 'test_custom_ami_emr',
    'owner': 'data_management',
    'data_classification': 'confidential',
    'environment': 'dev'
}

# Glue metastore configuration
metastore_configs = [
    {
        "Classification": "hive-site",
        "Properties": {
            "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
    },
    {
        "Classification": "spark-hive-site",
        "Properties": {
            "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
    }
]


def get_tags(args):
    return [
        {
            'Key': 'Name',
            'Value': args['cluster_name'],
        },
        {
            'Key': 'cost_center',
            'Value': 'xxx',
        },
        {
            'Key': 'application',
            'Value': 'spark',
        },
        {
            'Key': 'project',
            'Value': args['project'],
        },
        {
            'Key': 'owner',
            'Value': args['owner'],
        },
        {
            'Key': 'data_classification',
            'Value': args['data_classification'],
        },
        {
            'Key': 'environment',
            'Value': args['environment'],
        }
    ]


def get_emr_fleet_instance_template(args):
    """
    Get the template necessary to spin up an EMR Cluster using INSTANCE FLEETS.

    Pass the key values below in a dictionary. 
    """
    emr_apps = [{'Name': app} for app in args['applications']]
    template = {
        'Name': args['cluster_name'],
        'LogUri': args['log_uri'],
        'ReleaseLabel': args['release_label'],
        'RequestedEc2AvailabilityZones ': args['availability_zones'].replace(' ', '').split(','),
        'Instances': {
            'InstanceFleets': [
                {
                    "Name": "MasterFleet",
                    "InstanceFleetType": "MASTER",
                    "TargetSpotCapacity": args['master_target_spot_capacity'],
                    "InstanceTypeConfigs": convert_string_var_to_dict(args['master_fleet_instances'])
                },
                {
                    "Name": "CoreFleet",
                    "InstanceFleetType": "CORE",
                    "TargetSpotCapacity": args['core_target_spot_capacity'],
                    "LaunchSpecifications": {
                        "SpotSpecification": {
                            "TimeoutDurationMinutes": args['timeout_fleet_minutes'],
                            "TimeoutAction": "SWITCH_TO_ON_DEMAND"
                        }
                    },
                    "InstanceTypeConfigs": convert_string_var_to_dict(args['core_fleet_instances'])
                }
            ],
            'Ec2KeyName': args['ec2_keyname'],
            'Ec2SubnetIds': args['emr_ec2_subnet_ids'].replace(' ', '').split(','),
            'KeepJobFlowAliveWhenNoSteps': True if args['keeps_job_alive_when_no_steps'] is None else args['keeps_job_alive_when_no_steps'],
            'TerminationProtected': False,
        },
        'VisibleToAllUsers': args['visible_to_all_users'],
        'JobFlowRole': 'xxx',
        'ServiceRole': 'xxx',
        'Applications': emr_apps,
        'Tags': get_tags(args),
        'Steps': [
            {
                'Name': 'Setup Debugging',
                'ActionOnFailure': 'TERMINATE_CLUSTER',
                'HadoopJarStep': {
                    'Jar': 'command-runner.jar',
                    'Args': ['state-pusher-script']
                }
            },
        ],
        'Configurations': metastore_configs,
        'CustomAmiId': 'ami-xxx' 
    }

    return template

def convert_string_var_to_dict(text):
    """
    Function that transforms a string that has the format of a JSON or more precisely a List of Dictionaries
    to a real list of dictionaries.
    :param text: The format of the string is as a list of Dictionaries
    :type text: string
    :return: List of Dictionaries
    :type return: List
    """
    output_list = []
    for item in text.split('},'):
        for ch in ['\r', '\n', ' ']:
            if ch in text:
                item = item.replace(ch, "")
        if item.endswith('}'):
            output_list.append(literal_eval(item))
        else:
            output_list.append(literal_eval(item + "}"))
    return output_list


JOB_FLOW_OVERRIDES = get_emr_fleet_instance_template(emr_template_args)

dag = DAG(
    "test_emr_custom_ami",
    default_args={
        "owner": "airflow",
        "depends_on_past": False,
        "email": ["xxx@xxx"],
        "email_on_failure": True,
        "email_on_retry": False,
        "retries": 0,
        "retry_delay": timedelta(minutes=5),
        "start_date": "2019-04-01"
    },
    schedule_interval="@daily",
    catchup=False
)

# Create cluster
t_emr_create_cluster = EmrCreateJobFlowOperator(
    task_id='emr_create_cluster',
    job_flow_overrides=JOB_FLOW_OVERRIDES, # define custom AMI here
    aws_conn_id='aws_default',
    emr_conn_id='aws_default',
    queue='light',
    dag=dag
)

# Terminate cluster
t_emr_terminate_cluster = EmrTerminateJobFlowOperator(
    task_id='emr_terminate_cluster',
    job_flow_id="{{ task_instance.xcom_pull('emr_create_cluster', key='return_value') }}",
    aws_conn_id='aws_default',
    queue='light',
    trigger_rule=TriggerRule.ALL_DONE,
    dag=dag
)

t_emr_create_cluster >> t_emr_terminate_cluster

Я ожидаю, что созданный кластер EMR будет иметь собственный AMI, но это не так. Я проверяю это, просматривая «Сведения о конфигурации -> Пользовательский идентификатор AMI:» в консоли AWS при выборе созданного кластера.

...