У меня есть DAG с воздушным потоком, который выглядит следующим образом:
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
args = {
'owner': 'john',
'start_date': datetime(2020, 3, 01)
}
dag = DAG('spark_example', default_args=args)
operator = SparkSubmitOperator(
task_id='spark_submit_job',
conn_id='spark_default',
java_class='com.om.example.data',
application='/home/ubuntu/my-jar-1.0.jar',
total_executor_cores='1',
executor_cores='1',
executor_memory='1g',
num_executors='2',
name='airflow-spark-code',
verbose=False,
driver_memory='1g',
application_args=["yarn", "172.168.1.23:9092", "kafka_group_1"],
dag=dag,
)
Два вопроса:
- Как запустить этот DAG немедленно? Я не хочу планировать это, но не уверен, какие параметры мне нужны. Единственный способ, которым он работает из commandLine, - это если я сделаю: тест воздушного потока spark_example spark_submit_job -1
и ...
Из Airflow UI на 0.0.0.0:8080, как мне сразу запустить этот DAG ... когда я нажимаю «play», он ничего не делает, но говорит, что работает и остается в этом состоянии навсегда