Воздушный поток apply_defaults сообщает декоратору Аргумент обязателен - PullRequest
0 голосов
/ 05 февраля 2019

Я недавно столкнулся с этой неприятной ошибкой, когда apply_defaults декоратор от Airflow выбрасывает следующие трассировки стека ( мои **kwargs содержатjob_flow_id)

File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/mysql_import/dags/mysql_import_dag.py", line 23, in <module>
    sync_dag_builder.build_sync_dag()
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/mysql_import/dags/builders/sync_dag_builders/emr_sync_dag_builder.py", line 26, in build_sync_dag
    create_emr_task, terminate_emr_task = self._create_job_flow_tasks()
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/mysql_import/dags/builders/sync_dag_builders/emr_sync_dag_builder.py", line 44, in _create_job_flow_tasks
    task_id=GlobalConstants.EMR_TERMINATE_STEP)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/mnt/airflow/dags/zanalytics-airflow/src/main/aws/operators/emr_terminate_ancestor_job_flows_operator.py", line 31, in __init__
    EmrTerminateJobFlowOperator.__init__(self, *args, **kwargs)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/contrib/operators/emr_terminate_job_flow_operator.py", line 44, in __init__
    super(EmrTerminateJobFlowOperator, self).__init__(*args, **kwargs)
  File "/home/hadoop/.pyenv/versions/3.6.6/lib/python3.6/site-packages/airflow/utils/decorators.py", line 94, in wrapper
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Argument ['job_flow_id'] is required

Тревожные части

  • В настоящее время исключение исходит от __init__ встроенного 1021 * EmrTerminateJobFlowOperator
  • Раньше это было из EmrCreateJobFlowOperator, даже если это не принимает job_flow_id параметр;но прошло с тех пор, как

Заглядывая в decorators.py, я почувствовал, что sig_cache может испортить некоторые вещи.Фактически, из коммита, который представил , я не могу понять, как кэширование сигнатур функций работает вообще (по крайней мере, это не работает в таким образом )?


Я попытался удалить все __pycache__ и перезапустить scheduler, webserver без удачи (я запускаю их в отдельных Linux screen с )

  • Что может быть причиной ошибки?
  • Как работает sig_cache и нужно ли ее принудительно очищать при любых обстоятельствах?Если да, то как это очистить?

Среда

  • Python 3.6.6
  • Airflow 1.10.2
  • LocalExecutor
...