Я новичок в Airflow и начинаю преобразовывать код Ooz ie в Airflow. У меня есть сценарий оболочки, который преобразует содержимое двухстрочного файла в две переменные, которые необходимо передать другим задачам нисходящего потока. Вот сценарий оболочки: (он использует функцию массива для разделения каждого значения строки на собственную переменную) $ 1 - это учетная запись службы. $ 2 - это файл для cat. $ 3 - это домен. Файл actall выглядит следующим образом: 20200713 1
файл содержит дату в первой строке и номер во второй строке. В ooz ie я использую опцию вывода захвата в экшене оболочки. В Airflow я хочу использовать xcom pu sh и pull.
#!/bin/sh
export PYTHON_EGG_CACHE=./myeggs
/usr/bin/kinit -kt ${1}.keytab -V ${1}@${3}.COM
DateArgs=$(/usr/bin/hadoop fs -cat ${2})
#echo "Date1=$Date1"
saveIFS="$IFS"
IFS=$'\n'
array=($DateArgs)
IFS="$saveIFS"
echo "Date1=${array[0]}"
echo "Set=${array[1]}"
Ниже приведена моя попытка написать оператор Airflow bash для выполнения sh того же logi c в ooz ie:
Get_PipelineDates = BashOperator(
task_id="Get_PipelineDates",
bash_command='DateArgs=$(hdfs dfs -cat /dataingestionpdev/unrestricted/101-testpayer-testplatform/AppData/2020-07-06-87971440/1/Pipeline_Dates/Date1/Date.txt)
saveIFS="$IFS"
IFS=$'\n'
array=($DateArgs)
IFS="$saveIFS"
#echo "Date1=${array[0]}"
#echo "Set=${array[1]}"
echo "{{ ti.xcom_push(key="Date1", value="${array[0]}") }}" "{{ti.xcom_push(key="Set", value="${array[1]}") }}"',
dag=dag,
)
Я знаю, что это криво, потому что Airflow сообщает мне «Сломанный DAG: [/home/airflow/airflow/dags/INGPF_E2E_Initial_Dag_enhanced.py] EOL при сканировании строкового литерала (INGPF_E2E_Initial_Dag_enhanced.py 102) "
Строка 102 - это bash_command =.
Я ценю любые советы по началу работы здесь. У меня есть и другие подобные случаи, когда мне нужно записать до 20 строк в отдельные переменные для pu sh для последующих задач.
Спасибо за любую помощь и совет.
Kev