Я наконец нашел ответ, используя
https://stackoverflow.com/a/48969421/3808066
это большая часть работы, которую я должен был сделать еще один шаг. Я воспроизвожу здесь этот ответ и немного адаптирую его так, как я это сделал:
Некоторые вещи для проверки:
- Убедитесь, что у вас есть файл
log_config.py
, и он находится в правильном каталоге: ./config/log_config.py
.
- Убедитесь, что вы не забыли файл
__init__.py
в этом каталоге.
- Убедитесь, что вы определили обработчик
s3.task
и установите его форматер на airflow.task
- Убедитесь, что для обработчиков airflow.task и airflow.task_runner установлено значение s3.task
- Набор
task_log_reader = s3.task
в airflow.cfg
- Передайте
S3_LOG_FOLDER
на log_config
. Я сделал это с помощью переменной и извлек ее, как показано ниже log_config.py
.
Вот log_config.py, который работает:
import os
from airflow import configuration as conf
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Обратите внимание, что этот способ S3_LOG_FOLDER
можно указать в airflow.cfg
или в качестве окружения переменную AIRFLOW__CORE__S3_LOG_FOLDER
.