У меня Airflow 1.9.0, и я пытаюсь установить для журналов S3.
Я создаю пользовательскую папку config config / init .py config / log_config.py
Файл log_config.py таков:
import os
from airflow import configuration as conf
# TODO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
S3_LOG_FOLDER = conf.get('core', 's3_log_folder').upper()
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
# 'file.processor': {
# 'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
# 'formatter': 'airflow.processor',
# 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
#'filename_template': PROCESSOR_FILENAME_TEMPLATE,
# },
# When using s3 or gcs, provide a customized LOGGING_CONFIG
# in airflow_local_settings within your PYTHONPATH, see UPDATING.md
# for details
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
}
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
# 'airflow.processor': {
# 'handlers': ['file.processor'],
# 'level': LOG_LEVEL,
# 'propagate': True,
# },
'airflow.task': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
У меня есть файл airflow.cfg таким образом
remote_base_log_folder = s3://MYBUCKET/airflow
s3_log_folder = s3://MYBUCKET/airflow
remote_log_conn_id = MyS3Conn
task_log_reader = s3.task
logging_config_class = $AIRFLOW_HOME/config/log_config.LOGGING_CONFIG
remote_logging = False
encrypt_s3_logs = False
logging_level = INFO
Также в пользовательском интерфейсе я настроил соединение с S3, какэто
Когда я запускаю pipenv, запускаю веб-сервер воздушного потока, он запускается нормально и показывает мне, что на экране
[2018-12-06 00:38:59,345] {logging_config.py:55} INFO - Successfully imported user-defined logging config from log_config.LOGGING_CONFIG
Но если я вижу, что корзина пуста.
Любая помощь, пожалуйста!Заранее спасибо!