Airflow не будет писать логи на s3 - PullRequest
0 голосов
/ 08 мая 2018

Я пробовал разные способы настройки Airflow 1.9 для записи логов в s3, но он просто игнорирует это. После этого я обнаружил, что у многих людей возникают проблемы с чтением журналов, однако моя проблема заключается в том, что журналы остаются локальными. Я могу прочитать их без проблем, но они не находятся в указанном сегменте s3.

Сначала я попытался записать в файл airflow.cfg

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False

Затем я попытался установить переменные окружения

AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

Однако он игнорируется и файлы журнала остаются локальными.

Я запускаю поток воздуха из контейнера, я адаптировал https://github.com/puckel/docker-airflow к своему случаю, но он не будет записывать логи в s3. Я использую соединение aws для записи в сегменты в dags, и это работает, но журналы просто остаются локальными, независимо от того, запускаю ли я его на EC2 или локально на моей машине.

Ответы [ 2 ]

0 голосов
/ 28 марта 2019

Еще одна вещь, которая приводит к этому поведению (Воздушный поток 1.10):

Если вы посмотрите на airflow.utils.log.s3_task_handler.S3TaskHandler, вы заметите, что есть несколько условий, при которых журналы без вывода сообщений не будут записываться на S3:

1) Экземпляр регистратора уже close() d (не уверен, как это происходит на практике)
2) Файл журнала не существует на локальном диске (так я дошел до этой точки)

Вы также заметите, что регистратор работает в многопроцессорной / многопоточной среде, и что Airflow S3TaskHandler и FileTaskHandler делают с файловой системой некоторые вещи, которые совершенно не нужны. Если предположения о файлах журналов на диске соблюдены, файлы журналов S3 не будут записываться, и об этом событии ничего не записывается и не выдается. Если у вас есть конкретные, четко определенные потребности в журналировании, это может быть хорошей идеей реализовать все свои собственные logging Handlers (см. документы по python logging) и отключить все обработчики журнала Airflow (см. Airflow UPDATING.md).

0 голосов
/ 19 мая 2018

Я наконец нашел ответ, используя https://stackoverflow.com/a/48969421/3808066 это большая часть работы, которую я должен был сделать еще один шаг. Я воспроизвожу здесь этот ответ и немного адаптирую его так, как я это сделал:

Некоторые вещи для проверки:

  1. Убедитесь, что у вас есть файл log_config.py, и он находится в правильном каталоге: ./config/log_config.py.
  2. Убедитесь, что вы не забыли файл __init__.py в этом каталоге.
  3. Убедитесь, что вы определили обработчик s3.task и установите его форматер на airflow.task
  4. Убедитесь, что для обработчиков airflow.task и airflow.task_runner установлено значение s3.task
  5. Набор task_log_reader = s3.task в airflow.cfg
  6. Передайте S3_LOG_FOLDER на log_config. Я сделал это с помощью переменной и извлек ее, как показано ниже log_config.py.

Вот log_config.py, который работает:

import os

from airflow import configuration as conf


LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
       's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}

Обратите внимание, что этот способ S3_LOG_FOLDER можно указать в airflow.cfg или в качестве окружения переменную AIRFLOW__CORE__S3_LOG_FOLDER.

...