Создание кластера EMR с помощью EMRCreateJobFlowOperator - PullRequest
1 голос
/ 20 июня 2020

Я пытаюсь развернуть кластер EMR с помощью EMRCreateJobFlowOperator в Airflow, но задача постоянно терпит неудачу с 'return code -11' (см. Журнал ошибок ниже):

> [2020-06-20 10:39:24,054] {logging_mixin.py:112} INFO - Running %s on
> host %s <TaskInstance: test_emr_cluster_creation.create_job_flow
> 2020-06-20T00:00:00+00:00 [running]> richards-macbook-air.local
> [2020-06-20 10:39:24,087] {emr_create_job_flow_operator.py:66} INFO -
> Creating JobFlow using aws-conn-id: aws_default, emr-conn-id:
> emr_default [2020-06-20 10:39:33,907] {logging_mixin.py:112} INFO -
> [2020-06-20 10:39:33,906] {local_task_job.py:103} INFO - Task exited
> with return code -11

Код, который я написал, вставлен ниже:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import (
    EmrCreateJobFlowOperator,
)

DEFAULT_ARGS = {
    "owner": "XXX",
    "start_date": datetime(2020, 6, 20),
    "end_date": datetime(2020, 6, 20),
    "depends_on_past": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
    "catchup": False,
    "email": ["xxxxx@hotmail.com"],
    "email_on_failure": False,
    "email_on_retry": False,
}

JOB_FLOW_OVERRIDES = {
    'Name': 'test_cluster',
    'ReleaseLabel': 'emr-5.28.0',
    'Instances': {
        'Ec2KeyName': 'XXX',
        'InstanceGroups': [
            {
                'Name': 'Master node',
                'Market': 'SPOT',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 3,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': True,
    },
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
}

with DAG(
    dag_id="test_emr_cluster_creation",
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval="@once",
) as dag:

    # Start the cluster
    cluster_creator = EmrCreateJobFlowOperator(
        task_id="create_job_flow",
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id="aws_default",
        emr_conn_id="emr_default",
    )

cluster_creator

Я добавил свои AWS учетные данные (пользователь-администратор) в aws_default и emr_default в Airflow. Я также создал роли по умолчанию с помощью интерфейса командной строки.

Я протестировал создание кластера с помощью интерфейса командной строки (см. Код ниже):

aws emr create-cluster --name test_cluster --use-default-roles --release-label emr-5.28.0 --instance-count 2 --applications Name=Spark Name=Zeppelin --ec2-attributes KeyName=XXX --instance-type m5.xlarge --instance-count 3 --auto-terminate

... и это отлично работает. В поисках идей, которые помогут заставить это работать, я действительно застрял.

Спасибо!

...