Нет ошибки модуля, код не в пакете.Несмотря на наличие пути в PYTHONPATH - PullRequest
0 голосов
/ 26 апреля 2018

У меня небольшая проблема с путями Python при запуске DAG воздушного потока.Я очень плохо знаком с Airflow.

У меня есть следующие каталоги: я указал init .py внутри каждого каталога

-- ProjectA
    |-- code
        |__ module1.py
        |__ __init__.py
    |
    |-- dags
       |__ crawler.py   # Contains the bash operator to run a python module
       |__ __init__.py
    |
    |-- jobs
        |__ python_module.py # Contains a function that makes call to module1.py (contains the code to crawl websites) present inside Code package
        |__ __init__.py
    |
    |-- logs
    |
    |-- __init__.py

 and other Airflow files

Моя реализация Dag для BashOperator:ниже.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator



default_args = {
'owner': 'Sam',
'depends_on_past': False,
'start_date': datetime(2018, 4, 26),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=2)
}

dag = DAG('dump_aerial_image', default_args=default_args)

t1 = BashOperator(
task_id='AerialDUMP',
bash_command='python /Users/sam/App/ProjectA/jobs/python_module.py',
dag=dag
)

Когда я запускаю DAG из интерфейса Airflow.Я получаю следующую ошибку

ImportError: No module named 'code.module1'; 'code' is not a package
[2018-04-25 18:45:07,699] {base_task_runner.py:98} INFO - Subtask:     [2018-04-25 18:45:07,698] {bash_operator.py:105} INFO - Command exited with     return code 1
[2018-04-25 18:45:07,707] {models.py:1595} ERROR - Bash command failed
Traceback (most recent call last):
  File "/Users/sam/App-Setup/anaconda/envs/anaconda35/lib/python3.5/site-    packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/Users/sam/App-Setup/anaconda/envs/anaconda35/lib/python3.5/site-packages/airflow/operators/bash_operator.py", line 109, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed

Не уверен, как проверить эту ошибку.Я даже попытался добавить / Users / sam // App / ProjectA в мой путь к Python.Даже эта диета работает.Мой путь Python выглядит как

['/Users/sam/App/ProjectA/dags', '/Users/sam/App/ProjectA', '/Users/sam/App-Setup/anaconda/envs/anaconda35/lib/python35.zip', '/Users/sam/App-Setup/anaconda/envs/anaconda35/lib...........]

Не уверен, как преодолеть эту ситуацию, любая помощь будет оценена.

Ответы [ 2 ]

0 голосов
/ 26 апреля 2018

попробуйте запустить python /Users/sam/App/ProjectA/jobs/python_module.py.Если вы получаете ту же ошибку, это проблема с питоном, а не с потоком воздуха.

Вы используете Celery Executor?в этом случае переменная env определена во всех исполнителях?

В любом случае вы можете попробовать добавить для python_module.py sys.path.append

0 голосов
/ 26 апреля 2018

Здесь много чего происходит.

  • Нет импорта для code.module1, но в журнале возникает ошибка импорта
  • BashOperator используется длявыполнить задачу Python, not PythonOperator
  • Сбой команды bash, но, похоже, она не подключена к модулю

Так что я быпредложить:

  • Сначала попробуйте получить правильный импорт с помощью скрипта Python, не являющегося Airflow, в той же среде
  • Затем переключитесь на PythonOperator и используйте импортированную функцию - если только яЯ не вижу причины, по которой это нужно запустить с BashOperator
...