Я не знаю, решали ли вы когда-нибудь эту проблему, но после некоторого разочарования я закончил тем, что стал играть с воздушным потоком. Для справки, я следовал за этой статьей, чтобы заставить ее работать: https://www.astronomer.io/guides/logging/. Основная проблема заключалась в том, что регистрация воздушного потока принимает только строковый шаблон для формата регистрации, который json-logging не может подключить. Поэтому вы должны создать свои собственные классы ведения журнала и подключить его к пользовательскому классу конфигурации ведения журнала.
Скопируйте шаблон журнала здесь в папку src/config
и измените DEFAULT_CONFIG_LOGGING
на CONFIG_LOGGING
. Когда вы добьетесь успеха, поднимите поток воздуха, и вы получите сообщение о запуске воздушного потока, которое говорит Successfully imported user-defined logging config from logging_config.LOGGING_CONFIG
. Если это первый файл .py в папке конфигурации, не забудьте добавить пустой __init__.py
файл, чтобы заставить Python его забрать
Напишите ваш собственный JsonFormatter, чтобы вставить его в ваш обработчик. Я покончил с этим один .
Написать пользовательские классы обработчиков журналов. Так как я искал протоколирование JSON, мое выглядит так:
from airflow.utils.log.file_processor_handler import FileProcessorHandler
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import RedirectStdHandler
from pythonjsonlogger import jsonlogger
class JsonStreamHandler(RedirectStdHandler):
def __init__(self, stream):
super(JsonStreamHandler, self).__init__(stream)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonFileTaskHandler(FileTaskHandler):
def __init__(self, base_log_folder, filename_template):
super(JsonFileTaskHandler, self).__init__(base_log_folder, filename_template)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonFileProcessorHandler(FileProcessorHandler):
def __init__(self, base_log_folder, filename_template):
super(JsonFileProcessorHandler, self).__init__(base_log_folder, filename_template)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonRotatingFileHandler(RotatingFileHandler):
def __init__(self, filename, mode, maxBytes, backupCount):
super(JsonRotatingFileHandler, self).__init__(filename, mode, maxBytes, backupCount)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
- Подключите их к настройкам регистрации в вашем пользовательском файле logging_config.py.
'handlers': {
'console': {
'class': 'logging_handler.JsonStreamHandler',
'stream': 'sys.stdout'
},
'task': {
'class': 'logging_handler.JsonFileTaskHandler',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'logging_handler.JsonFileProcessorHandler',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
}
...
и
DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
'handlers': {
'processor_manager': {
'class': 'logging_handler.JsonRotatingFileHandler',
'formatter': 'airflow',
'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
'mode': 'a',
'maxBytes': 104857600, # 100MB
'backupCount': 5
}
}
...
Кроме того, должны выводиться журналы json, как в журналах DAG, так и на выходе.
Надеюсь, это поможет!