Я настроил Airflow 1.9 для хранения журналов ошибок в Google Cloud Storage, следуя (точно) этому описанию .Это работает, однако части содержимого всех журналов DAGs, кажется, дублируются (см. Ниже).Похоже, что журнал был добавлен к себе с дополнительной информацией о загрузке.Файл журнала на локальном диске не имеет этих дубликатов.
Кажется, что gcs_write по умолчанию использует режим добавления - так что единственный способ, который я нашел, - это изменить его на False.Есть ли конфигурация для этого?В чем причина этого в любом случае?
Я изменил следующие переменные в файле cfg:
task_log_reader=gcs.task
logging_config_class=log_config.LOGGING_CONFIG
remote_log_conn_id=gcs
log_config.py:
GCS_LOG_FOLDER = 'gs://XXXX/'
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
, 'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Журнал:
*** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location:
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: """)
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: args.func(args)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: pool=args.pool,
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = func(*args, **kwargs)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: result = task_copy.execute(context=context)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask: File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: raise AirflowException("Bash command failed")
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
[2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token