У меня есть группа доступности базы данных, которая выполняет обработку данных для нескольких рабочих таблиц. Он раскручивает кластер EMR на AWS для каждой таблицы, а затем обрабатывает данные.
Это прекрасно работает, если вы запускаете каждую таблицу отдельно, но я бы хотел запустить их параллельно. Моя проблема в том, что когда я запускаю их параллельно, я не могу отделить emr_connect
S SH Hook. Как именно я могу параметризовать это для каждой таблицы и передать их в ssh_hook
для каждой SSH_Operator
задачи?
args = {"owner": "airflow", "start_date": datetime(2020, 3, 1), "retries": 1}
dag = DAG(dag_id="mydag", default_args=args)
s3 = boto3.resource("s3")
s3Client = boto3.client("s3")
emr_connect = SSHHook(ssh_conn_id="parquet_aws_emr")
path_prefix = Variable.get("path_prefix")
result = s3Client.get_object(Bucket="mybucket", Key=f"{path_prefix}/myfile.json")
tables = json.loads(result["Body"].read().decode())
date = datetime.now()
year_month = (date.replace(day=1) - timedelta(days=1)).replace(day=1).strftime("%Y%m")
scripts = [
"dist_parquet_copy.sh",
"parquet_start_stack.sh",
"CF_parquet_emr_cluster_m5d.yaml",
"parquet_final_copy.sh",
]
executables = [
"dist_parquet_copy.sh",
"parquet_start_stack.sh",
"CF_parquet_emr_cluster_m5d.yaml",
"parquet_final_copy.sh",
"get_cluster_id.sh",
]
bash_call = """ {{ params.shell_path }} {{ params.input_value }} """
# start stack
copy_scripts = PythonOperator(
task_id="copy_scripts",
python_callable=copyreqscript.copyrequiredscripts,
dag=dag,
op_kwargs={
"scripts": scripts,
"executables": executables,
"input_path": Variable.get("path_prefix")
+ "/parquet_converter/",
"output_path": "/home/ec2-user/scripts/",
},
)
for table in tables["tables"]:
start_stack = BashOperator(
task_id=table["table_name"] + "_start_stack",
bash_command=""" {{ params.shell_path }} {{ params.input_value }} {{ var.value.path_prefix }}""",
params={
"shell_path": "~/scripts/parquet_start_stack.sh",
"input_value": table["table_name"] + "-converter",
},
dag=dag,
)
get_cluster_id = BashOperator(
task_id=table["table_name"] + "_get_cluster_id",
xcom_push=True,
bash_command=bash_call,
params={
"shell_path": "~/scripts/get_cluster_id.sh",
"input_value": table["table_name"] + "-converter",
},
trigger_rule=TriggerRule.ALL_DONE,
dag=dag,
)
wait_for_cluster = BashOperator(
task_id=table["table_name"] + "_wait_for_cluster",
xcom_push=True,
bash_command=f""" {{{{ params.shell_path }}}} {{{{ ti.xcom_pull(task_ids='{table["table_name"]}_get_cluster_id') }}}} """,
params={"shell_path": "~/scripts/sellout_wait_for_cluster.sh"},
dag=dag,
)
update_connection = BashOperator(
task_id=table["table_name"] + "_update_connection",
bash_command=f""" {{{{ params.shell_path }}}} {{{{ ti.xcom_pull(task_ids='{table["table_name"]}_wait_for_cluster') }}}} """,
params={"shell_path": "~/scripts/parquet_emr_connection_update.sh"},
dag=dag,
)
emr_copy_s3 = SSHOperator(
task_id=table["table_name"] + "_emr_copy_s3",
ssh_hook=emr_connect,
timeout=300,
command=""" sudo aws s3 cp s3://mybucket/{{ params.environment }}/parquet_converter/ /home/hadoop/scripts/ --recursive""",
params={"environment": path_prefix},
dag=dag,
)
emr_executable_file = SSHOperator(
task_id=table["table_name"] + "_emr_executable_file",
ssh_hook=emr_connect,
command=""" sudo chmod +x /home/hadoop/scripts/dist_parquet_copy.sh """,
dag=dag,
)