Привет, ребята, я новичок в Airflow и python. Мне нужно запускать задачи на основе значения переменной на входе json. Если значение переменной « страхование » равно « истина », то нужно запустить task1, task2, task3, иначе нужно запустить task4, task5, task6. Так как я новичок ie в этом, я мало знаю об использовании PythonOperator и BranchPythonOperator.
Это мой ввод json:
{
"car": {
"engine_no": "123_st_456",
"json": "{\"make\":\"Honda\",\"model\": Jazz, \"insurance\":\"true\",\"pollution\":\"true\" }"
}
}
Код приведен ниже :
from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
import logging
import json
default_args = {
'owner': 'airflow',
'depends_on_past': False
}
dag = DAG('DAG_NAME',default_args=default_args,schedule_interval=None,max_active_runs=5, start_date=datetime(2020, 8, 4))
PythonOperator(
task_id = 'sample_task',
python_callable = 'sample_fun',
op_kwargs = {
json : '{{ dag_run.car.json}}'
},
provide_context=True,
dag = dag
)
def sample_fun( json,**kwargs):
insurance_flag = json.dumps(json)['insurance']
task1 = BashOperator(
task_id='task1',
bash_command='echo 1'
)
task2 = BashOperator(
task_id='task2',
bash_command='echo 2'
)
task3 = BashOperator(
task_id='task3',
bash_command='echo 3'
)
task4 = BashOperator(
task_id='task4',
bash_command='echo 4'
)
task5 = BashOperator(
task_id='task5',
bash_command='echo 5'
)
task6 = BashOperator(
task_id='task6',
bash_command='echo 6'
)
if insurance_flag == "true":
task1.dag = dag
task2.dag = dag
task3.dag = dag
task1 >> task2 >> task3
else:
task4.dag = dag
task5.dag = dag
task6.dag = dag
task4 >> task5 >> task6