Воздушный поток - динамическое прохождение соединения s sh для нескольких кластеров - PullRequest
0 голосов
/ 14 апреля 2020

У меня есть группа доступности базы данных, которая выполняет обработку данных для нескольких рабочих таблиц. Он раскручивает кластер EMR на AWS для каждой таблицы, а затем обрабатывает данные.

Это прекрасно работает, если вы запускаете каждую таблицу отдельно, но я бы хотел запустить их параллельно. Моя проблема в том, что когда я запускаю их параллельно, я не могу отделить emr_connect S SH Hook. Как именно я могу параметризовать это для каждой таблицы и передать их в ssh_hook для каждой SSH_Operator задачи?

args = {"owner": "airflow", "start_date": datetime(2020, 3, 1), "retries": 1}

dag = DAG(dag_id="mydag", default_args=args)

s3 = boto3.resource("s3")
s3Client = boto3.client("s3")
emr_connect = SSHHook(ssh_conn_id="parquet_aws_emr")
path_prefix = Variable.get("path_prefix")

result = s3Client.get_object(Bucket="mybucket", Key=f"{path_prefix}/myfile.json")
tables = json.loads(result["Body"].read().decode())
date = datetime.now()
year_month = (date.replace(day=1) - timedelta(days=1)).replace(day=1).strftime("%Y%m")

scripts = [
    "dist_parquet_copy.sh",
    "parquet_start_stack.sh",
    "CF_parquet_emr_cluster_m5d.yaml",
    "parquet_final_copy.sh",
]

executables = [
    "dist_parquet_copy.sh",
    "parquet_start_stack.sh",
    "CF_parquet_emr_cluster_m5d.yaml",
    "parquet_final_copy.sh",
    "get_cluster_id.sh",
]

bash_call = """ {{ params.shell_path }} {{ params.input_value }} """

# start stack
copy_scripts = PythonOperator(
    task_id="copy_scripts",
    python_callable=copyreqscript.copyrequiredscripts,
    dag=dag,
    op_kwargs={
        "scripts": scripts,
        "executables": executables,
        "input_path": Variable.get("path_prefix")
        + "/parquet_converter/",
        "output_path": "/home/ec2-user/scripts/",
    },
)

for table in tables["tables"]:
    start_stack = BashOperator(
        task_id=table["table_name"] + "_start_stack",
        bash_command=""" {{ params.shell_path }} {{ params.input_value }} {{ var.value.path_prefix }}""",
        params={
            "shell_path": "~/scripts/parquet_start_stack.sh",
            "input_value": table["table_name"] + "-converter",
        },
        dag=dag,
    )

    get_cluster_id = BashOperator(
        task_id=table["table_name"] + "_get_cluster_id",
        xcom_push=True,
        bash_command=bash_call,
        params={
            "shell_path": "~/scripts/get_cluster_id.sh",
            "input_value": table["table_name"] + "-converter",
        },
        trigger_rule=TriggerRule.ALL_DONE,
        dag=dag,
    )

    wait_for_cluster = BashOperator(
        task_id=table["table_name"] + "_wait_for_cluster",
        xcom_push=True,
        bash_command=f""" {{{{ params.shell_path }}}} {{{{ ti.xcom_pull(task_ids='{table["table_name"]}_get_cluster_id') }}}} """,
        params={"shell_path": "~/scripts/sellout_wait_for_cluster.sh"},
        dag=dag,
    )

    update_connection = BashOperator(
        task_id=table["table_name"] + "_update_connection",
        bash_command=f""" {{{{ params.shell_path }}}} {{{{ ti.xcom_pull(task_ids='{table["table_name"]}_wait_for_cluster') }}}} """,
        params={"shell_path": "~/scripts/parquet_emr_connection_update.sh"},
        dag=dag,
    )

    emr_copy_s3 = SSHOperator(
        task_id=table["table_name"] + "_emr_copy_s3",
        ssh_hook=emr_connect,
        timeout=300,
        command=""" sudo aws s3 cp s3://mybucket/{{ params.environment }}/parquet_converter/ /home/hadoop/scripts/ --recursive""",
        params={"environment": path_prefix},
        dag=dag,
    )

    emr_executable_file = SSHOperator(
        task_id=table["table_name"] + "_emr_executable_file",
        ssh_hook=emr_connect,
        command=""" sudo chmod +x /home/hadoop/scripts/dist_parquet_copy.sh """,
        dag=dag,
    )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...