Совместимость с Airflow SQL Server - PullRequest
0 голосов
/ 17 ноября 2018

Сертифицирован ли airflow для работы с бэкэндом Microsoft SQL Server?

Настройка потока воздуха для взаимодействия с бэкэндом SQL Server, работающим в Azure.

Очевидно, что синтаксис «IS 0» не запускается при тестировании с использованием SQL Workbench в SQL Server.Это ошибка?

Получать ошибки с базовыми заданиями (примерами заданий)

  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 509, in do_execute
    cursor.execute(statement, parameters)
ProgrammingError: (pyodbc.ProgrammingError) ('42000', u"[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Incorrect syntax near '0'. (102) (SQLExecDirectW)") [SQL: u'SELECT TOP 1 dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.conf AS dag_run_conf \nFROM dag_run \nWHERE dag_run.dag_id= ? AND dag_run.external_trigger IS 0 ORDER BY dag_run.execution_date DESC'] [parameters: ('example_bash_operator',)] (Background on this error at: http://sqlalche.me/e/f405)
[2018-11-17 00:31:27,689] {jobs.py:1404} INFO - Heartbeating the process manager
[2018-11-17 00:31:27,689] {dag_processing.py:559} INFO - Processor for /home/sshuser/airflow/dags/example_spark_datareceiver.py finished
[2018-11-17 00:31:27,689] {dag_processing.py:578} WARNING - Processor for /home/sshuser/airflow/dags/example_spark_datareceiver.py exited with return code 1. See /home/sshuser/airflow/logs/scheduler/2018-11-17/example_spark_datareceiver.py.log for details.
[2018-11-17 00:31:27,692] {dag_processing.py:627} INFO - Started a process (PID: 93539) to generate tasks for /home/sshuser/airflow/dags/example_spark_datareceiver.py - logging into /home/sshuser/airflow/logs/scheduler/2018-11-17/example_spark_datareceiver.py.log
[2018-11-17 00:31:27,693] {jobs.py:1440} INFO - Heartbeating the executor

Это происходит для всех групп доступности баз данных (даже hello world) при запуске планировщика

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 7, 17),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

def format_hello(**kwargs):
    return 'Hello from Python !! Current execution time is ' + kwargs['execution_date'].strftime('%Y-%m-%d')

with DAG('hello-world-dag', schedule_interval=timedelta(minutes=5), catchup=False, default_args=default_args) as dag:
    # Define the task that prints hello with the bash operator.
    t1 = BashOperator(
        task_id='hello_from_bash',
        bash_command='echo Hello world from Bash !!')

    # Define a task that does nothing.
    t2 = DummyOperator(task_id='noop')


    # Define the task that prints hello using Python code.
    t3 = PythonOperator(task_id='hello_from_python', python_callable=format_hello, provide_context=True)

    # Define the DAG structure.
    t1 >> t2 >> t3

Больше журналов

[2018-11-19 23:57:01,848] {jobs.py:1404} INFO - Heartbeating the process manager
[2018-11-19 23:57:01,848] {dag_processing.py:559} INFO - Processor for /home/sshuser/airflow/dags/hello-world-dag.py finished
[2018-11-19 23:57:01,848] {dag_processing.py:578} WARNING - Processor for /home/sshuser/airflow/dags/hello-world-dag.py exited with return code 1. See /home/sshuser/airflow/logs/scheduler/2018-11-19/hello-world-dag.py.log for details.
[2018-11-19 23:57:01,851] {dag_processing.py:627} INFO - Started a process (PID: 68567) to generate tasks for /home/sshuser/airflow/dags/hello-world-dag.py - logging into /home/sshuser/airflow/logs/scheduler/2018-11-19/hello-world-dag.py.log
[2018-11-19 23:57:01,852] {jobs.py:1440} INFO - Heartbeating the executor
Process DagFileProcessor7-Process:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
    pickle_dags)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 1171, in _process_dags
    dag_run = self.create_dag_run(dag)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 799, in create_dag_run
    last_run = dag.get_last_dagrun(session=session)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/airflow/models.py", line 2797, in get_last_dagrun
    last = qry.first()
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2895, in first
    ret = list(self[0:1])
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2687, in __getitem__
    return list(res)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2995, in __iter__
    return self._execute_and_instances(context)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 3018, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 948, in execute
    return meth(self, multiparams, params)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 269, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1060, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1200, in _execute_context
    context)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1413, in _handle_dbapi_exception
    exc_info
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 265, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/home/sshuser/airfow_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 509, in do_execute
    cursor.execute(statement, parameters)
ProgrammingError: (pyodbc.ProgrammingError) ('42000', u"[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Incorrect syntax near '0'. (102) (SQLExecDirectW)") [SQL: u'SELECT TOP 1 dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.execution_date AS dag_run_execution_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.conf AS dag_run_conf \nFROM dag_run \nWHERE dag_run.dag_id = ? AND dag_run.external_trigger IS 0 ORDER BY dag_run.execution_date DESC'] [parameters: ('hello-world-dag',)] (Background on this error at: http://sqlalche.me/e/f405)
...