airflow 1.10.0 branchpythonoperator run failed: команда Celery не выполнена - PullRequest
0 голосов
/ 12 октября 2018

Я копирую пример кода с воздушным потоком example_branch_dop_operator_v3 в свой собственный тестовый файл test1_v2, я могу успешно запустить example_branch_dop_operator_v3, но выполнить test1_v2 не удалось.dag test1_v2 код (AIRFLOW_HOME / dags / test1.py):

import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'depends_on_past': True,
}

dag = DAG(dag_id='test1_v2'
          schedule_interval='*/1 * * * *', default_args=args)


def should_run(ds, **kwargs):

    print('------------- exec dttm = {} and minute = {}'.
          format(kwargs['execution_date'], kwargs['execution_date'].minute))
    if kwargs['execution_date'].minute % 2 == 0:
        return "oper_1"
    else:
        return "oper_2"


cond = BranchPythonOperator(
    task_id='condition',
    provide_context=True,
    python_callable=should_run,
    dag=dag)

oper_1 = DummyOperator(
    task_id='oper_1',
    dag=dag)
oper_1.set_upstream(cond)

oper_2 = DummyOperator(
    task_id='oper_2',
    dag=dag)
oper_2.set_upstream(cond)

команда airflow run test1_v2 condition "2018-09-01 00:00:00", есть рабочий журнал:

[2018-10-11 21: 20: 29,991]{cli.py:492} INFO - Запуск на хосте CenT
[2018-10-11 21: 23: 10,879] {settings.py:174} INFO - setting.configure_orm (): использование настроек пула.pool_size = 5, pool_recycle = 1800
[2018-10-11 21: 23: 11,343] { init .py: 51} ИНФОРМАЦИЯ - Использование исполнителя CeleryExecutor
[2018-10-11 21: 23: 11,572] {cli.py:478} ИНФОРМАЦИЯ - Идентификатор загрузки идентификатора 26
Отслеживание (последний последний вызов):
Файл "/ home / airflow / airflow / venv / bin / airflow", строка 32, в
args.func (args)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/utils/cli.py", строка 74, в оболочке
return f (* args, ** kwargs)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/bin/cli.py", строка 480, в работе
DagPickle) .filter (DagPickle.id == args.pickle) .first ()
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/query.py ", строка 2755, в первом
ret = list (self [0: 1])
File" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/query.py ", строка 2547, в getitem
список возврата (res)
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy / orm / loading.py ", строка 90, в случаях
util.raise_from_cause (ошибка)
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/util/compat.py ", строка 203, в Raise_from_cause
ререйз (тип (исключение), исключение, tb = exc_tb, причина = причина)
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/util/compat.py ", строка 187, в ререйзе
повысить значение
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm / loading.py ", строка 75, в случаях
строки = [proc (строка) для строки в выборке]
файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py ", строка 75, в
строках = [proc (строка) для выбранной строки]
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages / sqlalchemy / orm / loading.py ", строка 452, в _instance
loaded_instance, populate_existing, populators)
Файл" /home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy / orm / loading.py ", строка 513, в _populate_full
dict_ [key] = getter (row)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/sql/sqltypes.py", строка 1540, впроцесс
возврат загрузки (значение)
файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py", строка 316, в нагрузках
возврат загрузки(файл, игнорировать)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py", строка 304, в загрузке
obj = pik.load ()
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py", строка 465, в find_class
return StockUnpickler.find_class (self, module, name)
ImportError: Ни один модуль с именем необыкновенный_prefix_d47cb71ac291be245f60c8ac0070d906f4627fa1_test1 '
[2018-10-11 21: 23: 11,823: команда ERROR / ForkPoolWorker-6] execute_com51 обнаружила недавний последний вызов CalledProcessErcess500_5051 execute_com51 * последним вызовом 10 * * ** Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", строка 60, в файле execute_command
close_fds = True, env = env)
Файл "/data/python35/lib/python3.5/subprocess.py", строка 271, в check_call
повышение CalledProcessError (retcode, cmd)
subprocess.CalledProcessError: Command'условие воздушного потока test1_v1 условие 2018-09-01T10: 00: 00 + 08: 00 --pickle 26 --local' вернул ненулевой статус выхода 1
[2018-10-11 21: 23: 11,895: ОШИБКА /ForkPoolWorker-6] Нет
[2018-10-11 21: 23: 12,103: ОШИБКА / ForkPoolWorker-6] Задача airflow.executors.celery_executor.execute_command [efb4ef09-bdf8-4123-85c8-4dc73dc19d74] возникла неожиданно: исключение AirflowException («Команда Celery завершилась неудачно»,)
Traceback (последний вызов был последним):
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", строка 60, в execute_command
close_fds = True, env = env)
Файл "/data/python35/lib/python3.5/subprocess.py", строка 271, в check_call
вызвать CalledProcessError (retcode, cmd)
подпроцесс.CalledProcessError: Команда 'airflow run test1_v1 условие 2018-09-01T10: 00: 00 + 08: 00 --pickle 26 --local' вернул ненулевое состояние выхода 1

Во время обработки вышеуказанного исключения другоевозникла исключительная ситуация:

Трассировка (последний последний вызов):
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/celery/app/trace.py",строка 375, в trace_task
R = retval = fun (* args, ** kwargs)
File "/ home / airflow/airflow/venv/lib/python3.5/site-packages/celery/app/trace.py ", строка 632, в protected_call
return self.run (* args, ** kwargs)
Файл "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", строка 65, в execute_command
повышение AirflowException («Ошибка команды Celery»)
airflow.exceptions.AirflowException: команда Celery не выполнена

Почему dag test2_v1 может не работать?спасибо.

1 Ответ

0 голосов
/ 12 октября 2018

Когда я использую python_callable=range для замены python_callable=should_run ,, успешно запустите этот dag ,, поэтому я предполагаю, что причина в том, что воздушный поток не может найти must_run, как показано в журнале ImportError: No module named 'unusual_prefix_d47cb71ac291be245f60c8ac0070d906f4627fa1_test1'

Решение:

  • если вы используете команду, вы должны использовать airflow backfill test1_v2 -s 20180901 -e 20180902 -x документацию
  • Нет такой проблемы в случае запуска планировщика воздушного потока
...