Я настроил сервер Airflow с включенной удаленной регистрацией на S3. Это очень хорошо работает для базовых c задач (например: bash_command = "echo Hello world").
Но когда я хочу выполнить задание Talend, если это задание было успешно завершено, у меня есть только локальные журналы и нет логов в S3. И если задание не удалось, журналы появятся на S3. Вот пример журнала невыполненных заданий, хранящегося в S3:
[2020-02-20 14:15:48,284] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: test_etl2.test_etl_t1 2020-02-20T14:15:40.125565+00:00 [queued]>
[2020-02-20 14:15:48,298] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: test_etl2.test_etl_t1 2020-02-20T14:15:40.125565+00:00 [queued]>
[2020-02-20 14:15:48,299] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2020-02-20 14:15:48,299] {taskinstance.py:867} INFO - Starting attempt 1 of 4
[2020-02-20 14:15:48,299] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-02-20 14:15:48,311] {taskinstance.py:887} INFO - Executing <Task(BashOperator): test_etl_t1> on 2020-02-20T14:15:40.125565+00:00
[2020-02-20 14:15:48,313] {standard_task_runner.py:53} INFO - Started process 706 to run task
[2020-02-20 14:15:48,378] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: test_etl2.test_etl_t1 2020-02-20T14:15:40.125565+00:00 [running]> ip-172-31-40-66.eu-west-3.compute.internal
[2020-02-20 14:15:48,399] {bash_operator.py:82} INFO - Tmp dir root location:
/tmp
[2020-02-20 14:15:48,401] {bash_operator.py:105} INFO - Temporary script location: /tmp/airflowtmpTK_Rnm/test_etl_t18ga7XV
[2020-02-20 14:15:48,401] {bash_operator.py:115} INFO - Running command: /home/airflow/jobs/test_etl/test_etl/test_etl_run.sh --context_param fileparam=/home/airflow/jobs/files/prod.propertie
[2020-02-20 14:15:48,406] {bash_operator.py:122} INFO - Output:
[2020-02-20 14:15:48,512] {bash_operator.py:126} INFO -
[2020-02-20 14:15:48,512] {bash_operator.py:126} INFO - Job name : test_etl
[2020-02-20 14:15:48,512] {bash_operator.py:126} INFO - Version : 0.1
[2020-02-20 14:15:48,535] {bash_operator.py:126} INFO - Start date : 2020-02-20 14:15:48
[2020-02-20 14:15:48,535] {bash_operator.py:126} INFO -
[2020-02-20 14:15:48,538] {bash_operator.py:126} INFO - Exception in component tFileInputDelimited_1 (test_etl)
[2020-02-20 14:15:48,539] {bash_operator.py:126} INFO - java.io.FileNotFoundException: /home/airflow/jobs/files/prod.propertie (No such file or directory)
[2020-02-20 14:15:48,539] {bash_operator.py:126} INFO - at java.io.FileInputStream.open0(Native Method)
[2020-02-20 14:15:48,539] {bash_operator.py:126} INFO - at java.io.FileInputStream.open(FileInputStream.java:195)
[2020-02-20 14:15:48,539] {bash_operator.py:126} INFO - at java.io.FileInputStream.<init>(FileInputStream.java:138)
[2020-02-20 14:15:48,539] {bash_operator.py:126} INFO - at java.io.FileInputStream.<init>(FileInputStream.java:93)
[2020-02-20 14:15:48,540] {bash_operator.py:126} INFO - at org.talend.fileprocess.TOSDelimitedReader.<init>(TOSDelimitedReader.java:88)
[2020-02-20 14:15:48,540] {bash_operator.py:126} INFO - at org.talend.fileprocess.FileInputDelimited.<init>(FileInputDelimited.java:164)
[2020-02-20 14:15:48,540] {bash_operator.py:126} INFO - at local_project.test_etl_0_1.test_etl.tFileInputDelimited_1Process(test_etl.java:1050)
[2020-02-20 14:15:48,540] {bash_operator.py:126} INFO - at local_project.test_etl_0_1.test_etl.tJava_2Process(test_etl.java:736)
[2020-02-20 14:15:48,540] {bash_operator.py:126} INFO - at local_project.test_etl_0_1.test_etl.runJobInTOS(test_etl.java:3826)
[2020-02-20 14:15:48,540] {bash_operator.py:126} INFO - at local_project.test_etl_0_1.test_etl.main(test_etl.java:3596)
[2020-02-20 14:15:48,540] {bash_operator.py:126} INFO - the end is near
[2020-02-20 14:15:48,543] {bash_operator.py:130} INFO - Command exited with return code 4
[2020-02-20 14:15:48,555] {taskinstance.py:1128} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/bash_operator.py", line 134, in execute
raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2020-02-20 14:15:48,556] {taskinstance.py:1151} INFO - Marking task as UP_FOR_RETRY
[2020-02-20 14:15:58,281] {logging_mixin.py:112} INFO - [2020-02-20 14:15:58,281] {local_task_job.py:103} INFO - Task exited with return code 1
А вот журнал успешных заданий, не сохраненный в S3:
[2020-02-20 15:18:30,887] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: test_etl2.test_etl_t1 2020-02-20T15:18:26.507449+00:00 [queued]>
[2020-02-20 15:18:30,902] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: test_etl2.test_etl_t1 2020-02-20T15:18:26.507449+00:00 [queued]>
[2020-02-20 15:18:30,902] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2020-02-20 15:18:30,902] {taskinstance.py:867} INFO - Starting attempt 1 of 4
[2020-02-20 15:18:30,902] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-02-20 15:18:30,914] {taskinstance.py:887} INFO - Executing <Task(BashOperator): test_etl_t1> on 2020-02-20T15:18:26.507449+00:00
[2020-02-20 15:18:30,916] {standard_task_runner.py:53} INFO - Started process 6283 to run task
[2020-02-20 15:18:30,984] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: test_etl2.test_etl_t1 2020-02-20T15:18:26.507449+00:00 [running]> ip-172-31-40-66.eu-west-3.compute.internal
[2020-02-20 15:18:31,008] {bash_operator.py:82} INFO - Tmp dir root location:
/tmp
[2020-02-20 15:18:31,010] {bash_operator.py:105} INFO - Temporary script location: /tmp/airflowtmpIyj_DD/test_etl_t1CMnxRs
[2020-02-20 15:18:31,010] {bash_operator.py:115} INFO - Running command: /home/airflow/jobs/test_etl/test_etl/test_etl_run.sh --context_param fileparam=/home/airflow/jobs/files/prod.properties
[2020-02-20 15:18:31,014] {bash_operator.py:122} INFO - Output:
[2020-02-20 15:18:31,125] {bash_operator.py:126} INFO - ****************************************
[2020-02-20 15:18:31,125] {bash_operator.py:126} INFO - Job name : test_etl
[2020-02-20 15:18:31,125] {bash_operator.py:126} INFO - Version : 0.1
[2020-02-20 15:18:31,148] {bash_operator.py:126} INFO - Start date : 2020-02-20 15:18:31
[2020-02-20 15:18:31,148] {bash_operator.py:126} INFO - ****************************************
[2020-02-20 15:18:31,799] {bash_operator.py:126} INFO - Connexion OK !
[2020-02-20 15:18:31,799] {bash_operator.py:126} INFO - ****************************************
[2020-02-20 15:18:31,799] {bash_operator.py:126} INFO - Job name : test_etl
[2020-02-20 15:18:31,799] {bash_operator.py:126} INFO - Version : 0.1
[2020-02-20 15:18:31,799] {bash_operator.py:126} INFO - End date : 2020-02-20 15:18:31
[2020-02-20 15:18:31,800] {bash_operator.py:126} INFO - ****************************************
[2020-02-20 15:18:31,804] {bash_operator.py:130} INFO - Command exited with return code 0
[2020-02-20 15:18:31,818] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=test_etl2, task_id=test_etl_t1, execution_date=20200220T151826, start_date=20200220T151830, end_date=20200220T151831
[2020-02-20 15:18:40,884] {logging_mixin.py:112} INFO - [2020-02-20 15:18:40,884] {local_task_job.py:103} INFO - Task exited with return code 0
И, наконец, файл DAG:
from datetime import datetime,timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.slack_operator import SlackAPIPostOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 2, 19, 14),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'test_etl2',
default_args=default_args,
description='Test Talend job',
schedule_interval=timedelta(days=1)
)
t1 = BashOperator(
task_id='test_etl_t1',
bash_command="/home/airflow/jobs/test_etl/test_etl/test_etl_run.sh --context_param fileparam=" + Variable.get("fileparam") + " ",
dag=dag,
)
t2 = BashOperator(
task_id='test_etl_t2',
bash_command='echo "Je suis un cochon de lait voila mon grouin"',
dag=dag,
)
t1 >> t2
Если у кого-то есть решение. :)
Заранее спасибо, Жан-Кристоф