Airflow Google Cloud Logging - PullRequest
       31

Airflow Google Cloud Logging

0 голосов
/ 03 ноября 2018

Для Apache Airflow v1.10, работающий на Python2.7, с `pip install airflow [gcp_api] Я пытаюсь настроить ведение журнала для Google Cloud. У меня есть следующий файл log_config py:

GCS_LOG_FOLDER = 'gs://GCSbucket/'
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()

FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()

LOG_FORMAT = conf.get('core', 'LOG_FORMAT')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')

FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
# gcs buckets should start with "gs://"
# wasb buckets should start with "wasb"
# just to help Airflow select correct handler
 REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')

LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
    'airflow': {
        'format': LOG_FORMAT,
    },
},
'handlers': {
    'console': {
        'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
        'formatter': 'airflow',
        'stream': 'sys.stdout'
    },
    'task': {
        'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    },
    # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    'gcs.task': {
        'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'gcs_log_folder': GCS_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
},
'loggers': {
    'airflow.processor': {
        'handlers': ['processor'],
        'level': LOG_LEVEL,
        'propagate': False,
    },
    'airflow.task': {
        'handlers': ['gcs.task'],
        'level': LOG_LEVEL,
        'propagate': False,
    },
    'airflow.task_runner': {
        'handlers': ['gcs.task'],
        'level': LOG_LEVEL,
        'propagate': True,
    },
    'flask_appbuilder': {
        'handler': ['console'],
        'level': FAB_LOG_LEVEL,
        'propagate': True,
    }
},
'root': {
    'handlers': ['console'],
    'level': LOG_LEVEL,
}
}

REMOTE_HANDLERS = {
's3': {
    'task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': REMOTE_BASE_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        's3_log_folder': REMOTE_BASE_LOG_FOLDER,
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    },
},
'gcs': {
    'task': {
        'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    },
},
'wasb': {
    'task': {
        'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
        'wasb_container': 'airflow-logs',
        'filename_template': FILENAME_TEMPLATE,
        'delete_local_copy': False,
    },
    'processor': {
        'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
        'wasb_container': 'airflow-logs',
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        'delete_local_copy': False,
    },
}
}

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
 LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
 LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
 LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])

Мои airflow.cfg настройки:

[core]
remote_logging = True
remote_base_log_folder = gs:/GCSbucket/logs 
remote_log_conn_id = google_cloud_default 

Я получаю следующую ошибку:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/local/lib/python2.7/logging/__init__.py", line 1676, in shutdown
    h.close()
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/log/gcs_task_handler.py", line 73, in close
    if self.closed:
AttributeError: 'GCSTaskHandler' object has no attribute 'closed'

Кто-нибудь знает, что могло пойти не так? Следующее руководство: https://airflow.readthedocs.io/en/1.10.0/howto/write-logs.html

Обновление: провел дополнительные исследования исходного кода, здесь я вижу, что оператор close ничего не возвращает, и именно поэтому мое приложение аварийно завершает работу.

https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/utils/log/gcs_task_handler.py

Кто-нибудь знает, почему ничего не возвращается в

def close(self):
    if self.closed:
         return

Ответы [ 2 ]

0 голосов
/ 20 ноября 2018

Чтобы решить этот вопрос, я добавил следующее в ядро ​​airflow.cfg

[core]
log_filename_template =  {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log

# Log format
# we need to escape the curly braces by adding an additional curly brace
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s

# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =log_config.LOGGING_CONFIG
task_log_reader = gcs.task

На log_config.LOGGING_CONFIG я добавил следующее:

    # Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    'gcs.task': {
        'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'gcs_log_folder': GCS_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    }

Учебное пособие: https://airflow.readthedocs.io/en/1.10.0/howto/write-logs.html

0 голосов
/ 05 ноября 2018

Инструкции могут быть устаревшими. Пожалуйста, попробуйте с инструкциями по следующей ссылке:

https://airflow.readthedocs.io/en/latest/howto/write-logs.html#writing-logs-to-google-cloud-storage

Выполните следующие действия, чтобы включить ведение журнала в Google Cloud Storage.

Чтобы включить эту функцию, необходимо настроить airflow.cfg, как в этом примере:

[core]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True
remote_base_log_folder = gs://my-bucket/path/to/logs
remote_log_conn_id = MyGCSConn
  1. Сначала установите пакет gcp_api, например: pip install apache-airflow[gcp_api].
  2. Убедитесь, что в Airflow определена точка подключения Google Cloud Platform. Хук должен иметь доступ для чтения и записи к корзине Google Cloud Storage, определенной выше в remote_base_log_folder.
  3. Перезапустите веб-сервер и планировщик Airflow и запустите (или дождитесь) выполнения новой задачи.
  4. Убедитесь, что в новой корзине отображаются журналы для вновь выполненных задач.
  5. Убедитесь, что средство просмотра Google Cloud Storage работает в пользовательском интерфейсе. Подтяните недавно выполненную задачу и убедитесь, что вы видите что-то вроде:

-

*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
[2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
[2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
[2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
[2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
...