Как вызвать DAG воздушного потока для немедленного запуска из пользовательского интерфейса (не по расписанию) - PullRequest
0 голосов
/ 03 марта 2020

У меня есть DAG с воздушным потоком, который выглядит следующим образом:

from airflow import DAG

from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta


args = {
    'owner': 'john',
    'start_date': datetime(2020, 3, 01)
}
dag = DAG('spark_example', default_args=args)

operator = SparkSubmitOperator(
    task_id='spark_submit_job',
    conn_id='spark_default',
    java_class='com.om.example.data',
    application='/home/ubuntu/my-jar-1.0.jar',
    total_executor_cores='1',
    executor_cores='1',
    executor_memory='1g',
    num_executors='2',
    name='airflow-spark-code',
    verbose=False,
    driver_memory='1g',
    application_args=["yarn", "172.168.1.23:9092", "kafka_group_1"],
    dag=dag,
)

Два вопроса:

  1. Как запустить этот DAG немедленно? Я не хочу планировать это, но не уверен, какие параметры мне нужны. Единственный способ, которым он работает из commandLine, - это если я сделаю: тест воздушного потока spark_example spark_submit_job -1

и ...

Из Airflow UI на 0.0.0.0:8080, как мне сразу запустить этот DAG ... когда я нажимаю «play», он ничего не делает, но говорит, что работает и остается в этом состоянии навсегда

1 Ответ

0 голосов
/ 14 марта 2020

Вы можете использовать оператор only only для запуска вашей группы DAG в любое время.

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...