Как мы импортируем пакеты в Airflow? - PullRequest
0 голосов
/ 29 апреля 2019

Привет, я новичок в Воздушный поток , я пытаюсь импортировать мою собственную флягу настройки как DAG, созданную с помощью Talend Open Studio BigData , и у меня возникают некоторые проблемы при импорте моей DAG через терминал, ошибка не отображается, и моя группа доступности базы данных не добавляется в список группы доступности базы данных в Airflow UI

Вот мой код .py файла:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from airflow.utils.email import send_email

import os
import sys


bib_app = "/home/user/Docs/JObforAirflow/test/test_run.sh"
default_args = {
    'owner': 'yabid',
    'depends_on_past': False,
    'start_date': datetime(2019, 4, 29),
    'email': ['user@user.com'],
    'email_on_failure': True,
    'email_on_success': True,
    'provide_context': True    }

args = {
  'owner': 'yabid'
  ,'email': ['user@user.com']
  ,'start_date': datetime(2019, 4, 25)
  , 'provide_context': True    }

dag = DAG('run_jar', default_args=default_args)

t1 = BashOperator(
    task_id='dependency',
    bash_command= bib_app,
    dag=dag)


t2 = BashOperator(
 task_id = 't2',
 dag = dag,
 bash_command = 'java -cp /home/user/Docs/JObforAirflow/test/jobbatch.jar'
 )

t1.set_upstream(t2)

Ответы [ 2 ]

1 голос
/ 29 апреля 2019

Вы скопировали этот файл DAG в ~/airflow/dags?

Все ваши *.py файлы должны быть скопированы в AIRFLOW_HOME/dags, где AIRFLOW_HOME = ~ / airflow

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from airflow.utils.email import send_email

import os
import sys


bib_app = "/home/user/Docs/JObforAirflow/test/test_run.sh"
default_args = {
    'owner': 'yabid',
    'depends_on_past': False,
    'start_date': datetime(2019, 4, 25),
    'email': ['user@user.com'],
    'email_on_failure': True,
    'email_on_success': True,
    'provide_context': True
}


dag = DAG('run_jar', default_args=default_args)

t1 = BashOperator(
    task_id='dependency',
    bash_command= bib_app,
    dag=dag)


t2 = BashOperator(
    task_id = 't2',
    dag = dag,
    bash_command = 'java -cp /home/user/Docs/JObforAirflow/test/jobbatch.jar')

t1 >> t2
0 голосов
/ 29 апреля 2019
  1. У вас есть строка 'email': ['user@user.com], с незамкнутой строкой: 'user@user.com. Если вы попытаетесь запустить этот код в Airflow, DAG завершится неудачей.
  2. Как уже упоминалось в другом ответе, вы должны поместить все свои группы DAG в папку AIRFLOW_HOME/dags. После того, как вы добавите новый файл DAG, я рекомендую перезапустить ваши airflow-scheduler и airflow-webserver
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...