Я определил пример задачи Airflow, в которой я хотел запустить запрос PrestoDB, а затем задание Spark, чтобы выполнить простой пример подсчета слов.Вот группа DAG, которую я определил:
from pandas import DataFrame
import logging
from datetime import timedelta
from operator import add
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.presto_hook import PrestoHook
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'presto_dag',
default_args=default_args,
description='A simple tutorial DAG with PrestoDB and Spark',
# Continue to run DAG once per hour
schedule_interval='@daily',
)
def talk_to_presto():
ph = PrestoHook(host='presto.myhost.com', port=9988)
# Query PrestoDB
query = "show catalogs"
# Fetch Data
data = ph.get_records(query)
logging.info(data)
return data
def submit_to_spark():
# conf = SparkConf().setAppName("PySpark App").setMaster("http://sparkhost.com:18080/")
# sc = SparkContext(conf)
# data = sc.parallelize(list("Hello World"))
# counts = data.map(lambda x: (x, 1)).reduceByKey(add).sortBy(lambda x: x[1], ascending=False).collect()
# for (word, count) in counts:
# print("{}: {}".format(word, count))
# sc.stop()
return "Hello"
presto_task = PythonOperator(
task_id='talk_to_presto',
provide_context=True,
python_callable=talk_to_presto,
dag=dag,
)
spark_task = PythonOperator(
task_id='submit_to_spark',
provide_context=True,
python_callable=submit_to_spark,
dag=dag,
)
presto_task >> spark_task
Когда я отправляю задачу, около 20 экземпляров DAG остаются в рабочем состоянии:
Но это никогдазавершается, и журналы не создаются, по крайней мере для запроса PrestoDB.Я могу правильно выполнить тот же запрос PrestoDB из раздела Data Profiling > Ad-Hoc Query
в Airflow.
Я намеренно закомментировал код PySpark, поскольку он не работал и не был в центре внимания в вопросе.
У меня два вопроса:
- Почему задачи не завершены и остаются в рабочем состоянии?
- Что я делаю не так с
PrestoHook
, поскольку запрос не выполняется?не работает?