Форматирование журналов воздушного потока в формате JSON - PullRequest
0 голосов
/ 27 октября 2018

У меня есть требование для записи журналов Apache Airflow на стандартный вывод в формате JSON. Воздушный поток, кажется, не проецирует эту возможность из коробки. Я нашел пару модулей Python, которые способны выполнить эту задачу, но я не могу заставить реализацию работать.

В настоящее время я применяю класс в airflow/utils/logging.py для изменения регистратора, показанного ниже:

from pythonjsonlogger import jsonlogger

class StackdriverJsonFormatter(jsonlogger.JsonFormatter, object):
def __init__(self, fmt="%(levelname) %(asctime) %(nanotime) %(severity) %(message)", style='%', *args, **kwargs):
    jsonlogger.JsonFormatter.__init__(self, fmt=fmt, *args, **kwargs)

def process_log_record(self, log_record):
    if log_record.get('level'):
        log_record['severity'] = log_record['level']
        del log_record['level']
    else: 
        log_record['severity'] = log_record['levelname']
        del log_record['levelname']
    if log_record.get('asctime'):
        log_record['timestamp'] = log_record['asctime']
        del log_record['asctime']
    now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
    log_record['nanotime'] = now
    return super(StackdriverJsonFormatter, self).process_log_record(log_record)

Я реализую этот код в /airflow/settings.py, как показано ниже:

from airflow.utils import logging as logconf

def configure_logging(log_format=LOG_FORMAT):
     handler = logconf.logging.StreamHandler(sys.stdout)
     formatter = logconf.StackdriverJsonFormatter()
     handler.setFormatter(formatter)
     logging = logconf.logging.getLogger()
     logging.addHandler(handler)
''' code below was original airflow source code
     logging.root.handlers = []
     logging.basicConfig(
         format=log_format, stream=sys.stdout, level=LOGGING_LEVEL)
'''

Я пробовал пару разных вариантов этого и не могу заставить python-json-logger преобразовать журналы в JSON. Возможно, я не попадаю в root logger? Другой вариант, который я рассмотрел, - это ручное форматирование журналов в строку JSON. Не повезло с этим еще либо. Будем благодарны за любые альтернативные идеи, советы или поддержку.

Ура!

Ответы [ 2 ]

0 голосов
/ 03 апреля 2019

Я не знаю, решали ли вы когда-нибудь эту проблему, но после некоторого разочарования я закончил тем, что стал играть с воздушным потоком. Для справки, я следовал за этой статьей, чтобы заставить ее работать: https://www.astronomer.io/guides/logging/. Основная проблема заключалась в том, что регистрация воздушного потока принимает только строковый шаблон для формата регистрации, который json-logging не может подключить. Поэтому вы должны создать свои собственные классы ведения журнала и подключить его к пользовательскому классу конфигурации ведения журнала.

  1. Скопируйте шаблон журнала здесь в папку src/config и измените DEFAULT_CONFIG_LOGGING на CONFIG_LOGGING. Когда вы добьетесь успеха, поднимите поток воздуха, и вы получите сообщение о запуске воздушного потока, которое говорит Successfully imported user-defined logging config from logging_config.LOGGING_CONFIG. Если это первый файл .py в папке конфигурации, не забудьте добавить пустой __init__.py файл, чтобы заставить Python его забрать

  2. Напишите ваш собственный JsonFormatter, чтобы вставить его в ваш обработчик. Я покончил с этим один .

  3. Написать пользовательские классы обработчиков журналов. Так как я искал протоколирование JSON, мое выглядит так:

from airflow.utils.log.file_processor_handler import FileProcessorHandler
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import RedirectStdHandler
from pythonjsonlogger import jsonlogger

class JsonStreamHandler(RedirectStdHandler):
    def __init__(self, stream):
        super(JsonStreamHandler, self).__init__(stream)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonFileTaskHandler(FileTaskHandler):
    def __init__(self, base_log_folder, filename_template):
        super(JsonFileTaskHandler, self).__init__(base_log_folder, filename_template)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonFileProcessorHandler(FileProcessorHandler):
    def __init__(self, base_log_folder, filename_template):
        super(JsonFileProcessorHandler, self).__init__(base_log_folder, filename_template)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonRotatingFileHandler(RotatingFileHandler):
    def __init__(self, filename, mode, maxBytes, backupCount):
        super(JsonRotatingFileHandler, self).__init__(filename, mode, maxBytes, backupCount)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)
  1. Подключите их к настройкам регистрации в вашем пользовательском файле logging_config.py.
'handlers': {
    'console': {
        'class': 'logging_handler.JsonStreamHandler',
        'stream': 'sys.stdout'
    },
    'task': {
        'class': 'logging_handler.JsonFileTaskHandler',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'logging_handler.JsonFileProcessorHandler',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    }
}
...

и

DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
    'handlers': {
        'processor_manager': {
            'class': 'logging_handler.JsonRotatingFileHandler',
            'formatter': 'airflow',
            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
            'mode': 'a',
            'maxBytes': 104857600,  # 100MB
            'backupCount': 5
        }
    }
...

Кроме того, должны выводиться журналы json, как в журналах DAG, так и на выходе.

Надеюсь, это поможет!

0 голосов
/ 27 октября 2018

Я предполагаю, что вам не нужно создавать сам процесс Airflow для вывода журналов на стандартный вывод, и достаточно иметь какой-то другой процесс, который выводит журналы Airflow на стандартный вывод.

Вы можете написать сценарий, который ожидаетчтобы новые журналы появлялись в $AIRFLOW_HOME/logs, считывает и преобразует их в JSON и выводит преобразованные журналы в стандартный вывод.

...