Я создал собственный BashOperator, как этот
from airflow.operators.bash_operator import BashOperator
class CustomOperator(BashOperator):
"""
Custom bash operator that just write whatever it is given as stmt
The actual operator is more complex
"""
def __init__(self, stmt, **kwargs):
cmd = 'echo %s > /path/to/some/file.txt' % stmt
super().__init__(bash_command=cmd, **kwargs)
Тогда я создал тест для этого оператора
from datetime import datetime
from unittest import TestCase
from airflow import DAG
from airflow.models import TaskInstance
class CustomOperatorTestCase(TestCase):
def test_execute(self):
dag = DAG(dag_id='test', start_date=datetime.now())
stmt = "hello world"
task = CustomOperator(stmt=stmt, task_id='custom', dag=dag)
ti = TaskInstance(task=task, execution_date=datetime.now())
task.execute(ti.get_template_context())
with open('/path/to/some/file.txt', 'r') as f:
self.assert(f.read(), stmt)
Пока все хорошо, но давайте предположим, что у меня есть ошибка где-то в моем CustomOperator
. Например, я неправильно набрал echo
до eko
. Единственное сообщение, которое я получаю в консоли:
ERROR: test_execute (tests.operators.test_custom_operator.CustomOperatorTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/local/airflow/tests/operators/test_shp2pgsql_operator.py", line 26, in test_execute
result = task.execute(ti.get_template_context())
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 135, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
У меня нет способа отладки. Единственное решение, которое у меня есть, - это вызвать в веб-интерфейсе тег, содержащий эту задачу, и перейти на вкладку журналы.
Как посмотреть логи оператора в консоли при тестировании?