Как посмотреть логи оператора Airflow при тестировании - PullRequest
0 голосов
/ 04 июля 2019

Я создал собственный BashOperator, как этот

from airflow.operators.bash_operator import BashOperator

class CustomOperator(BashOperator):
    """ 
    Custom bash operator that just write whatever it is given as stmt 
    The actual operator is more complex
    """

    def __init__(self, stmt, **kwargs):
        cmd = 'echo %s > /path/to/some/file.txt' % stmt
        super().__init__(bash_command=cmd, **kwargs)

Тогда я создал тест для этого оператора

from datetime import datetime
from unittest import TestCase

from airflow import DAG
from airflow.models import TaskInstance

class CustomOperatorTestCase(TestCase):

    def test_execute(self):
        dag = DAG(dag_id='test', start_date=datetime.now())
        stmt = "hello world"
        task = CustomOperator(stmt=stmt, task_id='custom', dag=dag)
        ti = TaskInstance(task=task, execution_date=datetime.now())
        task.execute(ti.get_template_context())
        with open('/path/to/some/file.txt', 'r') as f:
            self.assert(f.read(), stmt)

Пока все хорошо, но давайте предположим, что у меня есть ошибка где-то в моем CustomOperator. Например, я неправильно набрал echo до eko. Единственное сообщение, которое я получаю в консоли:

ERROR: test_execute (tests.operators.test_custom_operator.CustomOperatorTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/local/airflow/tests/operators/test_shp2pgsql_operator.py", line 26, in test_execute
    result = task.execute(ti.get_template_context())
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 135, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed

У меня нет способа отладки. Единственное решение, которое у меня есть, - это вызвать в веб-интерфейсе тег, содержащий эту задачу, и перейти на вкладку журналы.

Как посмотреть логи оператора в консоли при тестировании?

Ответы [ 2 ]

0 голосов
/ 04 июля 2019

Оказалось, мне просто нужно добавить обработчик в логгер airflow.task.operators в начале моего тестового файла

import logging 
import sys

log = logging.getLogger("airflow.task.operators")
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
log.addHandler(handler)
0 голосов
/ 04 июля 2019

Вы можете использовать команду проверки воздушного потока + pysnooper.Это даст вам экземпляр и подробную информацию отладки.

  1. Простой режим (просто используйте команду тестирования воздушного потока)

    Пользовательский тест воздушного потока 20190704 Приведенная выше команда будет запускать задачу независимо.и распечатайте всю информацию на консоль.

  2. Сложный режим (проверка воздушного потока + pysnooper) Вам необходимо заранее установить pysonnper с помощью pip install pysnooper.и импортируйте pysnooper в py-файл и поместите декоратор @ pysnooper.snoop () чуть ниже функции, которую вы хотите вызвать.А затем запустите

    пользовательский тест проверки воздушного потока 20190704

Вы можете детализировать журналы, каждую строку выполнения и переменную информацию создания / изменения будут распечатаны на вашей консоли.

Удачи Если вы считаете ответ полезным, пожалуйста, проголосуйте за него

...