Задание воздушного потока не выполняется по расписанию с помощью PrestoDB Query - PullRequest
0 голосов
/ 05 марта 2019

Я определил пример задачи Airflow, в которой я хотел запустить запрос PrestoDB, а затем задание Spark, чтобы выполнить простой пример подсчета слов.Вот группа DAG, которую я определил:

from pandas import DataFrame
import logging
from datetime import timedelta

from operator import add

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from airflow.hooks.presto_hook import PrestoHook

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    }

dag = DAG(
    'presto_dag',
    default_args=default_args,
    description='A simple tutorial DAG with PrestoDB and Spark',
    # Continue to run DAG once per hour
    schedule_interval='@daily',
)

def talk_to_presto():
    ph = PrestoHook(host='presto.myhost.com', port=9988)

    # Query PrestoDB
    query = "show catalogs"

    # Fetch Data
    data = ph.get_records(query)
    logging.info(data)
    return data

def submit_to_spark():
    # conf = SparkConf().setAppName("PySpark App").setMaster("http://sparkhost.com:18080/")
    # sc = SparkContext(conf)
    # data = sc.parallelize(list("Hello World"))
    # counts = data.map(lambda x: (x, 1)).reduceByKey(add).sortBy(lambda x: x[1], ascending=False).collect()
    # for (word, count) in counts:
    #     print("{}: {}".format(word, count))
    # sc.stop()
    return "Hello"

presto_task = PythonOperator(
    task_id='talk_to_presto',
    provide_context=True,
    python_callable=talk_to_presto,
    dag=dag,
)

spark_task = PythonOperator(
    task_id='submit_to_spark',
    provide_context=True,
    python_callable=submit_to_spark,
    dag=dag,
)

presto_task >> spark_task

Когда я отправляю задачу, около 20 экземпляров DAG остаются в рабочем состоянии: Airflow DAG

Но это никогдазавершается, и журналы не создаются, по крайней мере для запроса PrestoDB.Я могу правильно выполнить тот же запрос PrestoDB из раздела Data Profiling > Ad-Hoc Query в Airflow.

Я намеренно закомментировал код PySpark, поскольку он не работал и не был в центре внимания в вопросе.

У меня два вопроса:

  1. Почему задачи не завершены и остаются в рабочем состоянии?
  2. Что я делаю не так с PrestoHook, поскольку запрос не выполняется?не работает?
...