Как вызвать метку, чтобы запустить сразу - PullRequest
0 голосов
/ 06 июня 2019

Я пытаюсь построить конвейер ETL.Первым шагом я получаю некоторые данные из API.Я бы хотел, чтобы этот даг запускался немедленно, когда он вызывается.

python dag.py

Кроме того, после вызова этого скрипта я ожидал увидеть этот ярлык на панели инструментов веб-сервера, но не вижу его.

даг.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from pipeline import Pipeline
import asyncio

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('data', default_args=default_args, schedule_interval=timedelta(days=1))

def fetch_user_ids(twitter_handle_name):
    pipeline = Pipeline()
    twitter_pipeline = pipeline.twitter_pipeline(twitter_handle_name)
    asyncio.run(twitter_pipeline.fetch_user_ids())

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=fetch_user_ids,
    op_kwargs={'twitter_handle_name': "MENnewsdesk"},
    dag=dag,
)

Сейчас я определил только одну задачу, но в будущем у дага будет несколько задач.

1 Ответ

0 голосов
/ 06 июня 2019

Команда python dag.py только проверяет код, который не собирается запускать dag. Если вы хотите запустить dag в веб-сервере, вам нужно поместить файл dag.py в каталог 'dag'. Airflow автоматически считывает файл из каталога dag, загружает dag на веб-сервер и запускает его в соответствии с датой start_date, определенной в defaults_args. Так как ваша начальная дата (2015, 6, 1) и интервал планирования составляет 1 день, в этом случае поток воздуха будет создавать одну задачу на каждый день до текущей даты. Поэтому я думаю, что вам нужно изменить start_date.

Если вы хотите запустить этот даг вручную, вам нужно установить scheduling_interval = None и использовать airflow trigger_dag dag_id (Документация: триггер воздушного потока )

Спасибо.

...