Airflow дублирует содержимое логов при записи в GCS - PullRequest
0 голосов
/ 20 ноября 2018

Я настроил Airflow 1.9 для хранения журналов ошибок в Google Cloud Storage, следуя (точно) этому описанию .Это работает, однако части содержимого всех журналов DAGs, кажется, дублируются (см. Ниже).Похоже, что журнал был добавлен к себе с дополнительной информацией о загрузке.Файл журнала на локальном диске не имеет этих дубликатов.

Кажется, что gcs_write по умолчанию использует режим добавления - так что единственный способ, который я нашел, - это изменить его на False.Есть ли конфигурация для этого?В чем причина этого в любом случае?

Я изменил следующие переменные в файле cfg:

task_log_reader=gcs.task
logging_config_class=log_config.LOGGING_CONFIG
remote_log_conn_id=gcs

log_config.py:

GCS_LOG_FOLDER = 'gs://XXXX/'

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        }
        , 'gcs.task': {
             'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
             'formatter': 'airflow.task',
             'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
             'gcs_log_folder': GCS_LOG_FOLDER,
             'filename_template': FILENAME_TEMPLATE,
         },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['gcs.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['gcs.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}

Журнал:

*** Reading remote log from gs://XXXX/mwt1/mwt1_task1/2018-10-02T15:30:00/1.log.
[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------

[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location: 
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask:  /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed

[2018-11-16 10:27:17,304] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:17,336] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1197}} INFO - Dependencies all met for <TaskInstance: mwt1.mwt1_task1 2018-10-02 15:30:00 [queued]>
[2018-11-16 10:27:17,342] {{models.py:1407}} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------

[2018-11-16 10:27:17,354] {{models.py:1428}} INFO - Executing <Task(BashOperator): mwt1_task1> on 2018-10-02 15:30:00
[2018-11-16 10:27:17,355] {{base_task_runner.py:115}} INFO - Running: ['bash', '-c', 'airflow run mwt1 mwt1_task1 2018-10-02T15:30:00 --job_id 48 --raw -sd DAGS_FOLDER/mwt1.py']
[2018-11-16 10:27:17,939] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:17,938] {{__init__.py:45}} INFO - Using executor LocalExecutor
[2018-11-16 10:27:18,231] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,230] {{models.py:189}} INFO - Filling up the DagBag from /usr/local/airflow/dags/mwt1.py
[2018-11-16 10:27:18,451] {{cli.py:374}} INFO - Running on host fdfdf2f790e1
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:70}} INFO - Tmp dir root location: 
[2018-11-16 10:27:18,473] {{base_task_runner.py:98}} INFO - Subtask:  /tmp
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:80}} INFO - Temporary script location: /tmp/airflowtmp5g0d6e4h//tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y
[2018-11-16 10:27:18,474] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,473] {{bash_operator.py:88}} INFO - Running command: bdasdasdasd
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:97}} INFO - Output:
[2018-11-16 10:27:18,479] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,479] {{bash_operator.py:101}} INFO - /tmp/airflowtmp5g0d6e4h/mwt1_task1_8ob3n0y: line 1: bdasdasdasd: command not found
[2018-11-16 10:27:18,480] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,480] {{bash_operator.py:105}} INFO - Command exited with return code 127
[2018-11-16 10:27:18,488] {{models.py:1595}} ERROR - Bash command failed
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,490] {{models.py:1616}} INFO - Marking task as UP_FOR_RETRY
[2018-11-16 10:27:18,503] {{models.py:1644}} ERROR - Bash command failed
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask:   """)
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask: Traceback (most recent call last):
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-11-16 10:27:18,504] {{base_task_runner.py:98}} INFO - Subtask:     args.func(args)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask:   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask:     pool=args.pool,
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask:   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask:     result = func(*args, **kwargs)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask:   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-11-16 10:27:18,505] {{base_task_runner.py:98}} INFO - Subtask:   File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 109, in execute
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask:     raise AirflowException("Bash command failed")
[2018-11-16 10:27:18,506] {{base_task_runner.py:98}} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
[2018-11-16 10:27:18,515] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,515] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{discovery.py:852}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/XXXX/o/mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log?alt=media
[2018-11-16 10:27:18,535] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,535] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,537] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,537] {{client.py:893}} INFO - Refreshing access_token
[2018-11-16 10:27:18,911] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,911] {{gcp_api_base_hook.py:82}} INFO - Getting connection using a JSON key file.
[2018-11-16 10:27:18,922] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,922] {{util.py:134}} WARNING - __init__() takes at most 2 positional arguments (3 given)
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{discovery.py:852}} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/XXXX/o?name=mwt1%2Fmwt1_task1%2F2018-10-02T15%3A30%3A00%2F1.log&alt=json&uploadType=media
[2018-11-16 10:27:18,928] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,928] {{client.py:595}} INFO - Attempting refresh to obtain initial access_token
[2018-11-16 10:27:18,930] {{base_task_runner.py:98}} INFO - Subtask: [2018-11-16 10:27:18,930] {{client.py:893}} INFO - Refreshing access_token

1 Ответ

0 голосов
/ 21 ноября 2018

Это известная проблема, которая затрагивает как удаленное ведение журнала GCS, так и S3, см. AIRFLOW-1916 .Это исправлено в Airflow 1.10, так что вы можете либо обновить, либо запустить вилку с fix .

...