Как проверить вывод теста воздушного потока? - PullRequest
0 голосов
/ 10 сентября 2018

Я попробовал учебную группу DAG по воздушному потоку, и она работает с планировщиком, я вижу журналы, созданные при запланированном запуске. Но если я использую тест командной строки, я не вижу вывод:

airflow test my_tutorial_2 templated 2018-09-08
[2018-09-10 15:41:43,121] {__init__.py:51} INFO - Using executor SequentialExecutor
[2018-09-10 15:41:43,281] {models.py:258} INFO - Filling up the DagBag from /Users/xiang/Documents/BigData/airflow/dags
[2018-09-10 15:41:43,338] {example_kubernetes_operator.py:54} WARNING - Could not import KubernetesPodOperator: No module named 'kubernetes'
[2018-09-10 15:41:43,339] {example_kubernetes_operator.py:55} WARNING - Install kubernetes dependencies with:     pip install airflow['kubernetes']

Это все вывод, а мой вывод не там.

Версия с воздушным потоком:

▶ pip list
Package          Version
---------------- ---------
alembic          0.8.10
apache-airflow   1.10.0

Ответы [ 3 ]

0 голосов
/ 18 сентября 2018

Если вы используете Ariflow v1.10, вы можете установить для атрибута распространения регистратора taskinstance значение True, тогда запись журнала будет распространена на корневой регистратор, использующий обработчик консоли, и распечатана в sys.stdout.

Добавить ti.log.propagate = True после строки 589 до site-packages / airflow / bin / cli.py может выполнить этот трюк.

0 голосов
/ 01 ноября 2018

Я столкнулся с этой проблемой и с AirFlow 1.10.0. Как упоминалось Louis Genasi , airflow run войдет в смертельную спираль с настройками по умолчанию и обработчиком консоли. Я подозреваю, что может быть ошибка с классом регистрации по умолчанию в 1.10.0.

Я обошел проблему, изменив обработчик ведения журнала на Python на logging.StreamHandler (который, по-видимому, используется по умолчанию в Airflow <1.10.0): </p>

'handlers': {
    'console': {
        'class': 'logging.StreamHandler',
        'formatter': 'airflow',
        'stream': 'ext://sys.stdout'
},
'loggers': {
    'airflow.processor': {
        'handlers': ['console'],
        'level': LOG_LEVEL,
        'propagate': False,
    },
    'airflow.task': {
        'handlers': ['console'],
        'level': LOG_LEVEL,
        'propagate': False,
    },
    'flask_appbuilder': {
        'handler': ['console'],
        'level': FAB_LOG_LEVEL,
        'propagate': True,
    }
},
'root': {
    'handlers': ['console'],
    'level': LOG_LEVEL,
}
0 голосов
/ 10 сентября 2018

С тех пор я обнаружил, что, хотя установка 'console' в качестве обработчика для регистратора airflow.task позволяет вам видеть выходные данные команд 'airflow test', кажется, что команды 'airflow run' также вызывают войти в бесконечный цикл и исчерпать память. Поэтому я бы делал это только в среде, в которой вы хотите запускать только команды проверки воздушного потока

Почему он это делает, я пока не знаю, и есть ли способ решить этот вопрос, не прерывая «поток воздуха», мне неясно

В конфигурации регистрации по умолчанию для Airflow 1.10.0 доступны следующие средства ведения журнала:

'loggers': {
    'airflow.processor': {
        'handlers': ['processor'],
        'level': LOG_LEVEL,
        'propagate': False,
    },
    'airflow.task': {
        'handlers': ['task'],
        'level': LOG_LEVEL,
        'propagate': False,
    },
    'flask_appbuilder': {
        'handler': ['console'],
        'level': FAB_LOG_LEVEL,
        'propagate': True,
    }
},

и регистратор airflow.task (который используется при запуске задачи) использует обработчик 'task' :

'handlers': {
    'console': {
        'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
        'formatter': 'airflow',
        'stream': 'sys.stdout'
    },
    'task': {
        'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
        'formatter': 'airflow',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    },
},

который (если не изменен) будет записывать только выходные данные задачи в файл журнала. Если вы также хотите видеть вывод в stdout, то вам нужно добавить обработчик console в список обработчиков, используемых регистратором airflow.task :

'airflow.task': {
    'handlers': ['task', 'console'],
    'level': LOG_LEVEL,
    'propagate': False,
},

Это может быть сделано либо путем настройки пользовательского класса конфигурации ведения журнала, который переопределяет конфигурацию по умолчанию, либо путем редактирования файла настроек по умолчанию

wherever_you_installed_airflow / сайт-пакеты / воздуха / config_templates / airflow_local_settings.py

...