DAG не срабатывает с консоли - PullRequest
0 голосов
/ 28 февраля 2020

Я пытался создать образец DAG. Test хорошо работает с командной строкой, но когда я запускаю тот же DAG из консоли, он всегда остается в рабочем состоянии. Вот код ниже для справки. Помощь будет по достоинству оценена. Я действительно не понимаю, как застрять в рабочем состоянии. Кроме того, планировщик воздушного потока не показывает никаких выходных данных, когда я вручную запускаю dag с консоли.

from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks import HttpHook
from airflow.contrib.operators.ssh_operator import SSHOperator

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'flow',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

node_bash = """
pwd
"""


t1 = SSHOperator(
     ssh_conn_id='datalakefs',
     task_id='loadfstodb',
     command=node_bash,
     dag=dag)

t1

А вот и выходные данные тестовой команды.

sudo airflow test flow loadfstodb 28-02-2020
[2020-02-28 10:50:53,602] {settings.py:254} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=13429
[2020-02-28 10:50:54,090] {__init__.py:51} INFO - Using executor LocalExecutor
[2020-02-28 10:50:54,090] {dagbag.py:403} INFO - Filling up the DagBag from /home/airflow/dag
/usr/local/lib/python3.6/dist-packages/airflow/utils/helpers.py:439: DeprecationWarning: Importing 'HttpHook' directly from 'airflow.hooks' has been deprecated. Please import from 'airflow.hooks.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
/usr/local/lib/python3.6/dist-packages/airflow/utils/helpers.py:439: DeprecationWarning: Importing 'PostgresHook' directly from 'airflow.hooks' has been deprecated. Please import from 'airflow.hooks.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
  DeprecationWarning)
[2020-02-28 10:50:54,354] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: flow.loadfstodb 2020-02-28T00:00:00+00:00 [None]>
[2020-02-28 10:50:54,368] {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: flow.loadfstodb 2020-02-28T00:00:00+00:00 [None]>
[2020-02-28 10:50:54,369] {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
[2020-02-28 10:50:54,369] {taskinstance.py:867} INFO - Starting attempt 1 of 2
[2020-02-28 10:50:54,369] {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
[2020-02-28 10:50:54,371] {taskinstance.py:887} INFO - Executing <Task(SSHOperator): loadfstodb> on 2020-02-28T00:00:00+00:00
[2020-02-28 10:50:54,390] {ssh_operator.py:92} INFO - ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.
[2020-02-28 10:50:54,421] {base_hook.py:84} INFO - Using connection to: id: datalakefs. Host: datalake, Port: None, Schema: None, Login: airflow, Password: XXXXXXXX, extra: XXXXXXXX
[2020-02-28 10:50:54,422] {ssh_hook.py:166} WARNING - Remote Identification Change is not verified. This wont protect against Man-In-The-Middle attacks
[2020-02-28 10:50:54,424] {ssh_hook.py:170} WARNING - No Host Key Verification. This wont protect against Man-In-The-Middle attacks
[2020-02-28 10:50:54,431] {transport.py:1572} INFO - Connected (version 2.0, client OpenSSH_7.6p1)
[2020-02-28 10:50:54,543] {transport.py:1572} INFO - Authentication (publickey) successful!
[2020-02-28 10:50:54,543] {ssh_operator.py:109} INFO - Running command:
pwd
[2020-02-28 10:50:55,760] {ssh_operator.py:143} INFO - /home/airflow

1 Ответ

0 голосов
/ 28 февраля 2020

Это потому, что планировщик воздушного потока обновляет свои метки с интервалом. И ему нужно некоторое время, чтобы подождать, пока планировщик узнает о присоединении нового метки.

И из-за первоначального запуска он застрял навсегда из-за записи в таблице «работа» postgres. Затем мне пришлось удалить конкретную запись, связанную с Job, и я снова запустил DAG. И это сработало.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...