Воздушный поток - скрипт не запускается при запуске - PullRequest
0 голосов
/ 25 мая 2018

У меня есть скрипт воздушного потока, который пытается вставить данные из одной таблицы в другую, я использую Amazon Redshift DB.Приведенный ниже скрипт при запуске не выполняется.Статус Task_id остается в виде «Нет статуса» в представлении «График», и никакие другие ошибки не отображаются.

## Third party Library Imports

import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 1, 23, 12),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")


#######################
## Login to DB

def db_login():
    global db_conn
    try:
        db_conn = psycopg2.connect(
        " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = '5439' sslmode = 'require' ")
    except:
        print("I am unable to connect to the database.")
        print('Connection Task Complete: Connected to DB')
        return (db_conn)


#######################

def insert_data():
    cur = db_conn.cursor()
    cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2 ;""")
    db_conn.commit()
    print('ETL Task Complete')

def job_run():
    db_login()
    insert_data()

##########################################

t1 = PythonOperator(
task_id='DBConnect',
python_callable=job_run,
bash_command='python3 ~/airflow/dags/sample.py',
dag=dag)

t1

Может кто-нибудь помочь найти причину проблемы.Спасибо

Обновлен код (05/28)

## Third party Library Imports

import psycopg2
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
#from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 23, 12),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('sample_dag', default_args=default_args, catchup=False, schedule_interval="@once")


#######################
## Login to DB


def data_warehouse_login():
    global dwh_connection
    try:
        dwh_connection = psycopg2.connect(
        " dbname = 'name' user = 'user' password = 'pass' host = 'host' port = 'port' sslmode = 'require' ")
    except:
        print("Connection Failed.")
        print('Connected successfully')
    return (dwh_connection)

def insert_data():
    cur = dwh_connection.cursor()
    cur.execute("""insert into tbl_1 select id,bill_no,status from tbl_2 limit 2;""")
    dwh_connection.commit()
    print('Task Complete: Insert success')


def job_run():
    data_warehouse_login()
    insert_data()



##########################################

t1 = PythonOperator(
    task_id='DWH_Connect',
    python_callable=job_run(),
    # bash_command='python3 ~/airflow/dags/sample.py',
    dag=dag)

t1

Журнал сообщений при запуске сценария

[2018-05-28 11:36:45,300] {jobs.py:343} DagFileProcessor26 INFO - Started process (PID=26489) to work on /Users/user/airflow/dags/sample.py
[2018-05-28 11:36:45,306] {jobs.py:534} DagFileProcessor26 ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
[2018-05-28 11:36:45,310] {jobs.py:1521} DagFileProcessor26 INFO - Processing file /Users/user/airflow/dags/sample.py for tasks to queue
[2018-05-28 11:36:45,310] {models.py:167} DagFileProcessor26 INFO - Filling up the DagBag from /Users/user/airflow/dags/sample.py
/Users/user/anaconda3/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
  """)
Task Complete: Insert success
[2018-05-28 11:36:50,964] {jobs.py:1535} DagFileProcessor26 INFO - DAG(s) dict_keys(['latest_only', 'example_python_operator', 'test_utils', 'example_bash_operator', 'example_short_circuit_operator', 'example_branch_operator', 'tutorial', 'example_passing_params_via_test_command', 'latest_only_with_trigger', 'example_xcom', 'example_http_operator', 'example_skip_dag', 'example_trigger_target_dag', 'example_branch_dop_operator_v3', 'example_subdag_operator', 'example_subdag_operator.section-1', 'example_subdag_operator.section-2', 'example_trigger_controller_dag', 'insert_data2']) retrieved from /Users/user/airflow/dags/sample.py
[2018-05-28 11:36:51,159] {jobs.py:1169} DagFileProcessor26 INFO - Processing example_subdag_operator
[2018-05-28 11:36:51,167] {jobs.py:566} DagFileProcessor26 INFO - Skipping SLA check for <DAG: example_subdag_operator> because no tasks in DAG have SLAs
[2018-05-28 11:36:51,170] {jobs.py:1169} DagFileProcessor26 INFO - Processing sample_dag
[2018-05-28 11:36:51,174] {jobs.py:354} DagFileProcessor26 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 346, in helper
pickle_dags)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1581, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1171, in _process_dags
dag_run = self.create_dag_run(dag)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
  File "/Users/user/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 776, in create_dag_run
if next_start <= now:
TypeError: '<=' not supported between instances of 'NoneType' and 'datetime.datetime'

View of the Airflow UI - Graph View

Журнал из представления графика

* Файл журнала не является локальным.* Загрузка здесь: http://:8793/log/sample_dag/DWH_Connect/2018-05-28T12:23:57.595234 *** Не удалось получить файл журнала с рабочего.

* Чтение удаленных журналов ... * Неподдерживаемое местоположение удаленного журнала.

Ответы [ 2 ]

0 голосов
/ 26 мая 2018

К ответу, предоставленному kaxil, я бы хотел добавить, что вы должны использовать IDE для разработки Airflow.PyCharm прекрасно работает для меня.

При этом, пожалуйста, в следующий раз обязательно просмотрите доступные поля в документации.Для PythonOperator см. Документы здесь:

https://airflow.apache.org/code.html#airflow.operators.PythonOperator

Подпись выглядит следующим образом:

class airflow.operators.PythonOperator ( python_callable , op_args = None, op_kwargs = None, provide_context = False, templates_dict = None, templates_exts = None, * args, ** kwargs)

, а для BashOperator см. документы здесь:

https://airflow.apache.org/code.html#airflow.operators.BashOperator

Подпись:

class airflow.operators.BashOperator ( bash_command , xcom_push = False, env = Нет, output_encoding = 'utf-8 ', * args, ** kwargs)

Основные моменты от меня, чтобы показать параметры, которые вы использовали.

Обязательно немного покопайтесь в документации перед использованиемОператор - моя рекомендация.

РЕДАКТИРОВАТЬ

После просмотра обновления кода остается одна вещь:

Убедитесь, что при определении python_callable взадача, которую вы делаете без скобок, в противном случае будет вызван код (что очень не интуитивно понятно, если вы не знаетемы об этом).Поэтому ваш код должен выглядеть так:

t1 = PythonOperator(
    task_id='DWH_Connect',
    python_callable=job_run,
    dag=dag)
0 голосов
/ 26 мая 2018

Вместо PythonOperator необходимо иметь BashOperator и PythonOperator.

Вы получаете сообщение об ошибке, потому что PythonOperator не имеет аргумента bash_command

t1 = PythonOperator(
    task_id='DBConnect',
    python_callable=db_login,
    dag=dag
    )

t2 = BashOperator(
    task_id='Run Python File',
    bash_command='python3 ~/airflow/dags/sample.py',
    dag=dag
    )

t1 >> t2
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...