Сертифицирован ли airflow для работы с бэкэндом Microsoft SQL Server?
Настройка потока воздуха для взаимодействия с бэкэндом SQL Server, работающим в Azure.
Очевидно, что синтаксис «IS 0» не запускается при тестировании с использованием SQL Workbench в SQL Server.Это ошибка?
Получать ошибки с базовыми заданиями (примерами заданий)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
context)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 509, in do_execute
cursor.execute(statement, parameters)
ProgrammingError: (pyodbc.ProgrammingError) ('42000', u"[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Incorrect syntax near '0'. (102) (SQLExecDirectW)") [SQL: u'SELECT TOP 1 dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.conf AS dag_run_conf \nFROM dag_run \nWHERE dag_run.dag_id= ? AND dag_run.external_trigger IS 0 ORDER BY dag_run.execution_date DESC'] [parameters: ('example_bash_operator',)] (Background on this error at: http://sqlalche.me/e/f405)
[2018-11-17 00:31:27,689] {jobs.py:1404} INFO - Heartbeating the process manager
[2018-11-17 00:31:27,689] {dag_processing.py:559} INFO - Processor for /home/sshuser/airflow/dags/example_spark_datareceiver.py finished
[2018-11-17 00:31:27,689] {dag_processing.py:578} WARNING - Processor for /home/sshuser/airflow/dags/example_spark_datareceiver.py exited with return code 1. See /home/sshuser/airflow/logs/scheduler/2018-11-17/example_spark_datareceiver.py.log for details.
[2018-11-17 00:31:27,692] {dag_processing.py:627} INFO - Started a process (PID: 93539) to generate tasks for /home/sshuser/airflow/dags/example_spark_datareceiver.py - logging into /home/sshuser/airflow/logs/scheduler/2018-11-17/example_spark_datareceiver.py.log
[2018-11-17 00:31:27,693] {jobs.py:1440} INFO - Heartbeating the executor
Это происходит для всех групп доступности баз данных (даже hello world) при запуске планировщика
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 7, 17),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
def format_hello(**kwargs):
return 'Hello from Python !! Current execution time is ' + kwargs['execution_date'].strftime('%Y-%m-%d')
with DAG('hello-world-dag', schedule_interval=timedelta(minutes=5), catchup=False, default_args=default_args) as dag:
# Define the task that prints hello with the bash operator.
t1 = BashOperator(
task_id='hello_from_bash',
bash_command='echo Hello world from Bash !!')
# Define a task that does nothing.
t2 = DummyOperator(task_id='noop')
# Define the task that prints hello using Python code.
t3 = PythonOperator(task_id='hello_from_python', python_callable=format_hello, provide_context=True)
# Define the DAG structure.
t1 >> t2 >> t3
Больше журналов
[2018-11-19 23:57:01,848] {jobs.py:1404} INFO - Heartbeating the process manager
[2018-11-19 23:57:01,848] {dag_processing.py:559} INFO - Processor for /home/sshuser/airflow/dags/hello-world-dag.py finished
[2018-11-19 23:57:01,848] {dag_processing.py:578} WARNING - Processor for /home/sshuser/airflow/dags/hello-world-dag.py exited with return code 1. See /home/sshuser/airflow/logs/scheduler/2018-11-19/hello-world-dag.py.log for details.
[2018-11-19 23:57:01,851] {dag_processing.py:627} INFO - Started a process (PID: 68567) to generate tasks for /home/sshuser/airflow/dags/hello-world-dag.py - logging into /home/sshuser/airflow/logs/scheduler/2018-11-19/hello-world-dag.py.log
[2018-11-19 23:57:01,852] {jobs.py:1440} INFO - Heartbeating the executor
Process DagFileProcessor7-Process:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
pickle_dags)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 1171, in _process_dags
dag_run = self.create_dag_run(dag)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 799, in create_dag_run
last_run = dag.get_last_dagrun(session=session)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/models.py", line 2797, in get_last_dagrun
last = qry.first()
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2895, in first
ret = list(self[0:1])
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2687, in __getitem__
return list(res)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2995, in __iter__
return self._execute_and_instances(context)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3018, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 948, in execute
return meth(self, multiparams, params)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 269, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1060, in _execute_clauseelement
compiled_sql, distilled_params
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1200, in _execute_context
context)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1413, in _handle_dbapi_exception
exc_info
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 265, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
context)
File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 509, in do_execute
cursor.execute(statement, parameters)
ProgrammingError: (pyodbc.ProgrammingError) ('42000', u"[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Incorrect syntax near '0'. (102) (SQLExecDirectW)") [SQL: u'SELECT TOP 1 dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.conf AS dag_run_conf \nFROM dag_run \nWHERE dag_run.dag_id = ? AND dag_run.external_trigger IS 0 ORDER BY dag_run.execution_date DESC'] [parameters: ('hello-world-dag',)] (Background on this error at: http://sqlalche.me/e/f405)