Я пытаюсь построить конвейер ETL.Первым шагом я получаю некоторые данные из API.Я бы хотел, чтобы этот даг запускался немедленно, когда он вызывается.
python dag.py
Кроме того, после вызова этого скрипта я ожидал увидеть этот ярлык на панели инструментов веб-сервера, но не вижу его.
даг.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from pipeline import Pipeline
import asyncio
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('data', default_args=default_args, schedule_interval=timedelta(days=1))
def fetch_user_ids(twitter_handle_name):
pipeline = Pipeline()
twitter_pipeline = pipeline.twitter_pipeline(twitter_handle_name)
asyncio.run(twitter_pipeline.fetch_user_ids())
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=fetch_user_ids,
op_kwargs={'twitter_handle_name': "MENnewsdesk"},
dag=dag,
)
Сейчас я определил только одну задачу, но в будущем у дага будет несколько задач.