Прекратить выполнение оставшихся задач в потоке воздуха - PullRequest
0 голосов
/ 26 июня 2018

У меня три задания t1,t2,t3. вывод каждой задачи является следующим вводом задачи, например, t1 вывод является вводом t2. После завершения t1 я получаю пустую выходную папку (что может произойти в моем случае, и это приемлемо и помечено t1 как успешное), но t2 не удалось получить выходные данные t1, поскольку есть файлов нет. Я хочу отметить t2 и t3 как успешные, если нет файлов. Как я могу пропустить следующие две задачи.


Я просмотрел документацию по воздушному потоку, и другие статьи наткнулись на датчики и метод тыка. Но не уверен, как поступить с этим.

Ответы [ 2 ]

0 голосов
/ 27 июня 2018

Ответ @ andscoop хорош, но только для того, чтобы принести больше идей:

Возможное решение 1

Я делаю нечто подобное (зависимости A> B> C), и я решил подход, используя XCOM, выдвинутый по умолчанию предыдущей задачей.

Любое значение, которое возвращает метод execute, сохраняется как сообщение Xcom. под ключ return_value. Мы рассмотрим эту тему позже. Источник http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/

# copy&paste it into dags/stackoverflow.py to test it

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from datetime import datetime


dag = DAG('stackoverflow', description='Another Dag',
          schedule_interval='* * * 1 1',
          start_date=datetime(2018, 6, 27), catchup=False)


def do_a(**kwargs):
    # Assuming that your TASK A is not returning a value
    return None


task_a = PythonOperator(task_id='do_a',
                        python_callable=do_a,
                        provide_context=True,
                        dag=dag)


def do_b(**kwargs):
    result_from_a = kwargs['ti'].xcom_pull(task_ids='do_a')
    if result_from_a:
        print("Continue with your second task")
    else:
        print("Send a notification somewhere, log something or stop the job here.")


task_b = PythonOperator(task_id='do_b',
                        python_callable=do_b,
                        provide_context=True,
                        dag=dag)
task_a >> task_b

enter image description here enter image description here

Возможное решение 2

ветвление. Более изощренным способом (и с использованием лучших практик) вы можете выполнить ветку, чтобы определить следующий шаг / задачу на основе результата t1. Я не могу сделать правильный пример сейчас, но вот 2 источника, чтобы понять, как это работает с примерами:

0 голосов
/ 26 июня 2018

Вы можете использовать SensorOperator , точнее, FileSensorOperator , чтобы проверить, существует ли файл. Затем вы можете использовать soft_fail arg, чтобы пометить задачи как «пропущенные», когда файл не существует. Это позволит DAG успешно работать, сохраняя при этом правильную историю того, что происходило при проверке файла.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...