Я копирую пример кода с воздушным потоком example_branch_dop_operator_v3
в свой собственный тестовый файл test1_v2, я могу успешно запустить example_branch_dop_operator_v3, но выполнить test1_v2 не удалось.dag test1_v2 код (AIRFLOW_HOME / dags / test1.py):
import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': True,
}
dag = DAG(dag_id='test1_v2'
schedule_interval='*/1 * * * *', default_args=args)
def should_run(ds, **kwargs):
print('------------- exec dttm = {} and minute = {}'.
format(kwargs['execution_date'], kwargs['execution_date'].minute))
if kwargs['execution_date'].minute % 2 == 0:
return "oper_1"
else:
return "oper_2"
cond = BranchPythonOperator(
task_id='condition',
provide_context=True,
python_callable=should_run,
dag=dag)
oper_1 = DummyOperator(
task_id='oper_1',
dag=dag)
oper_1.set_upstream(cond)
oper_2 = DummyOperator(
task_id='oper_2',
dag=dag)
oper_2.set_upstream(cond)
команда airflow run test1_v2 condition "2018-09-01 00:00:00"
, есть рабочий журнал:
[2018-10-11 21: 20: 29,991]{cli.py:492} INFO - Запуск на хосте CenT
[2018-10-11 21: 23: 10,879] {settings.py:174} INFO - setting.configure_orm (): использование настроек пула.pool_size = 5, pool_recycle = 1800
[2018-10-11 21: 23: 11,343] { init .py: 51} ИНФОРМАЦИЯ - Использование исполнителя CeleryExecutor
[2018-10-11 21: 23: 11,572] {cli.py:478} ИНФОРМАЦИЯ - Идентификатор загрузки идентификатора 26
Отслеживание (последний последний вызов):
Файл "/ home / airflow / airflow / venv / bin / airflow", строка 32, в
args.func (args)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/utils/cli.py", строка 74, в оболочке
return f (* args, ** kwargs)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/bin/cli.py", строка 480, в работе
DagPickle) .filter (DagPickle.id == args.pickle) .first ()
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/query.py ", строка 2755, в первом
ret = list (self [0: 1])
File" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/query.py ", строка 2547, в getitem
список возврата (res)
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy / orm / loading.py ", строка 90, в случаях
util.raise_from_cause (ошибка)
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/util/compat.py ", строка 203, в Raise_from_cause
ререйз (тип (исключение), исключение, tb = exc_tb, причина = причина)
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/util/compat.py ", строка 187, в ререйзе
повысить значение
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm / loading.py ", строка 75, в случаях
строки = [proc (строка) для строки в выборке]
файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py ", строка 75, в
строках = [proc (строка) для выбранной строки]
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages / sqlalchemy / orm / loading.py ", строка 452, в _instance
loaded_instance, populate_existing, populators)
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy / orm / loading.py ", строка 513, в _populate_full
dict_ [key] = getter (row)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/sql/sqltypes.py", строка 1540, впроцесс
возврат загрузки (значение)
файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py", строка 316, в нагрузках
возврат загрузки(файл, игнорировать)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py", строка 304, в загрузке
obj = pik.load ()
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py", строка 465, в find_class
return StockUnpickler.find_class (self, module, name)
ImportError: Ни один модуль с именем необыкновенный_prefix_d47cb71ac291be245f60c8ac0070d906f4627fa1_test1 '
[2018-10-11 21: 23: 11,823: команда ERROR / ForkPoolWorker-6] execute_com51 обнаружила недавний последний вызов CalledProcessErcess500_5051 execute_com51 * последним вызовом 10 * * ** Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", строка 60, в файле execute_command
close_fds = True, env = env)
Файл "/data/python35/lib/python3.5/subprocess.py", строка 271, в check_call
повышение CalledProcessError (retcode, cmd)
subprocess.CalledProcessError: Command'условие воздушного потока test1_v1 условие 2018-09-01T10: 00: 00 + 08: 00 --pickle 26 --local' вернул ненулевой статус выхода 1
[2018-10-11 21: 23: 11,895: ОШИБКА /ForkPoolWorker-6] Нет
[2018-10-11 21: 23: 12,103: ОШИБКА / ForkPoolWorker-6] Задача airflow.executors.celery_executor.execute_command [efb4ef09-bdf8-4123-85c8-4dc73dc19d74] возникла неожиданно: исключение AirflowException («Команда Celery завершилась неудачно»,)
Traceback (последний вызов был последним):
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", строка 60, в execute_command
close_fds = True, env = env)
Файл "/data/python35/lib/python3.5/subprocess.py", строка 271, в check_call
вызвать CalledProcessError (retcode, cmd)
подпроцесс.CalledProcessError: Команда 'airflow run test1_v1 условие 2018-09-01T10: 00: 00 + 08: 00 --pickle 26 --local' вернул ненулевое состояние выхода 1
Во время обработки вышеуказанного исключения другоевозникла исключительная ситуация:
Трассировка (последний последний вызов):
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/celery/app/trace.py",строка 375, в trace_task
R = retval = fun (* args, ** kwargs)
File "/ home / airflow/airflow/venv/lib/python3.5/site-packages/celery/app/trace.py ", строка 632, в protected_call
return self.run (* args, ** kwargs)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", строка 65, в execute_command
повышение AirflowException («Ошибка команды Celery»)
airflow.exceptions.AirflowException: команда Celery не выполнена
Почему dag test2_v1 может не работать?спасибо.