Я пытался создать образец DAG. Test хорошо работает с командной строкой, но когда я запускаю тот же DAG из консоли, он всегда остается в рабочем состоянии. Вот код ниже для справки. Помощь будет по достоинству оценена. Я действительно не понимаю, как застрять в рабочем состоянии. Кроме того, планировщик воздушного потока не показывает никаких выходных данных, когда я вручную запускаю dag с консоли.
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks import HttpHook
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'flow',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
node_bash = """
pwd
"""
t1 = SSHOperator(
ssh_conn_id='datalakefs',
task_id='loadfstodb',
command=node_bash,
dag=dag)
t1
А вот и выходные данные тестовой команды.
sudo airflow test flow loadfstodb 28-02-2020
[2020-02-28 10:50:53,602] {settings.py:254} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=13429
[2020-02-28 10:50:54,090] {__init__.py:51} INFO - Using executor LocalExecutor
[2020-02-28 10:50:54,090] {dagbag.py:403} INFO - Filling up the DagBag from /home/airflow/dag
/usr/local/lib/python3.6/dist-packages/airflow/utils/helpers.py:439: DeprecationWarning: Importing 'HttpHook' directly from 'airflow.hooks' has been deprecated. Please import from 'airflow.hooks.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
DeprecationWarning)
/usr/local/lib/python3.6/dist-packages/airflow/utils/helpers.py:439: DeprecationWarning: Importing 'PostgresHook' directly from 'airflow.hooks' has been deprecated. Please import from 'airflow.hooks.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
DeprecationWarning)
[2020-02-28 10:50:54,354] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: flow.loadfstodb 2020-02-28T00:00:00+00:00 [None]>
[2020-02-28 10:50:54,368] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: flow.loadfstodb 2020-02-28T00:00:00+00:00 [None]>
[2020-02-28 10:50:54,369] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2020-02-28 10:50:54,369] {taskinstance.py:867} INFO - Starting attempt 1 of 2
[2020-02-28 10:50:54,369] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-02-28 10:50:54,371] {taskinstance.py:887} INFO - Executing <Task(SSHOperator): loadfstodb> on 2020-02-28T00:00:00+00:00
[2020-02-28 10:50:54,390] {ssh_operator.py:92} INFO - ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.
[2020-02-28 10:50:54,421] {base_hook.py:84} INFO - Using connection to: id: datalakefs. Host: datalake, Port: None, Schema: None, Login: airflow, Password: XXXXXXXX, extra: XXXXXXXX
[2020-02-28 10:50:54,422] {ssh_hook.py:166} WARNING - Remote Identification Change is not verified. This wont protect against Man-In-The-Middle attacks
[2020-02-28 10:50:54,424] {ssh_hook.py:170} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks
[2020-02-28 10:50:54,431] {transport.py:1572} INFO - Connected (version 2.0, client OpenSSH_7.6p1)
[2020-02-28 10:50:54,543] {transport.py:1572} INFO - Authentication (publickey) successful!
[2020-02-28 10:50:54,543] {ssh_operator.py:109} INFO - Running command:
pwd
[2020-02-28 10:50:55,760] {ssh_operator.py:143} INFO - /home/airflow