Возможно ли иметь в Airflow конвейер, который не привязан ни к какому графику? - PullRequest
0 голосов
/ 24 мая 2018

Мне нужно иметь конвейер, который будет выполняться вручную или программно, возможно с Airflow?Похоже, прямо сейчас каждый рабочий процесс ДОЛЖЕН быть привязан к расписанию.

Ответы [ 3 ]

0 голосов
/ 24 мая 2018

В потоке воздуха каждый DAG должен иметь дату начала и интервал расписания *, например, ежечасно:

import datetime

dag = DAG(
    dag_id='my_dag',
    schedule_interval=datetime.timedelta(hours=1),
    start_date=datetime(2018, 5, 23),
)

(без расписания, как узнать, когда запускать?)

В дополнение к расписанию cron вы можете установить расписание на @once для запуска только один раз.

* Единственное исключение: вы можете опустить расписание для внешних групп DAG потому что Airflow не будет их сам планировать.

Тем не менее, при этом, если вы опустите расписание, вам нужно каким-то образом вызвать DAG извне.Если вы хотите иметь возможность вызывать группу DAG программно, например, в результате отдельного условия, возникающего в другой группе DAG, вы можете сделать это с помощью TriggerDagRunOperator .Вы также можете услышать эту идею, называемую внешними группами DAG.

Вот пример использования из примеров групп DAG с воздушным потоком:

Файл 1 - example_trigger_controller_dag.py :

"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py)

This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
  a. A python callable that decides whether or not to trigger the Target DAG
  b. An optional params dict passed to the python callable to help in
     evaluating whether or not to trigger the Target DAG
  c. The id (name) of the Target DAG
  d. The python callable can add contextual info to the DagRun created by
     way of adding a Pickleable payload (e.g. dictionary of primitives). This
     state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime

import pprint

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    c_p = context['params']['condition_param']
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if context['params']['condition_param']:
        dag_run_obj.payload = {'message': context['params']['message']}
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj


# Define the DAG
dag = DAG(dag_id='example_trigger_controller_dag',
          default_args={"owner": "airflow",
                        "start_date": datetime.utcnow()},
          schedule_interval='@once')


# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
                                trigger_dag_id="example_trigger_target_dag",
                                python_callable=conditionally_trigger,
                                params={'condition_param': True,
                                        'message': 'Hello World'},
                                dag=dag)

Файл 2 - example_trigger_target_dag.py :

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime

import pprint
pp = pprint.PrettyPrinter(indent=4)

# This example illustrates the use of the TriggerDagRunOperator. There are 2
# entities at work in this scenario:
# 1. The Controller DAG - the DAG that conditionally executes the trigger
#    (in example_trigger_controller.py)
# 2. The Target DAG - DAG being triggered
#
# This example illustrates the following features :
# 1. A TriggerDagRunOperator that takes:
#   a. A python callable that decides whether or not to trigger the Target DAG
#   b. An optional params dict passed to the python callable to help in
#      evaluating whether or not to trigger the Target DAG
#   c. The id (name) of the Target DAG
#   d. The python callable can add contextual info to the DagRun created by
#      way of adding a Pickleable payload (e.g. dictionary of primitives). This
#      state is then made available to the TargetDag
# 2. A Target DAG : c.f. example_trigger_target_dag.py

args = {
    'start_date': datetime.utcnow(),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='example_trigger_target_dag',
    default_args=args,
    schedule_interval=None)


def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag)


# You can also access the DagRun object in templates
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 '{{ dag_run.conf["message"] if dag_run else "" }}" ',
    dag=dag)
0 голосов
/ 24 мая 2018

Да, это может быть достигнуто путем передачи None в schedule_interval в default_args.

Проверьте эту документацию по прогону DAG.

Например:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 12, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'schedule_interval': None, # Check this line 
}
0 голосов
/ 24 мая 2018

Просто установите schedule_interval на None при создании группы обеспечения доступности баз данных:

dag = DAG('workflow_name',
          template_searchpath='path',
          schedule_interval=None,
          default_args=default_args)

Из Руководство по воздушному потоку :

Каждая группа обеспечения доступности баз данныхможет иметь или не иметь расписание, которое информирует о том, как создаются прогоны DAG.schedule_interval определяется как аргументы DAG и предпочтительно получает выражение cron в виде объекта str или datetime.timedelta.

Далее в руководстве перечисляются некоторые «пресеты» cron, одним из которых являетсяNone.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...