Я пытаюсь создать группу обеспечения доступности баз данных, которая генерирует задачи динамически на основе файла JSON, расположенного в хранилище. Я следовал этому руководству шаг за шагом:
https://bigdata-etl.com/apache-airflow-create-dynamic-dag/
Но DAG застревает со следующим сообщением:
Можно ли прочитать внешний файл и использовать его для динамического создания задач в Composer? Я могу сделать это, когда я читаю данные только из переменной воздушного потока, но когда я читаю внешний файл, метка застревает в состоянии isn't available in the web server's DagBag object
. Мне нужно читать из внешнего файла, так как содержимое JSON будет меняться при каждом выполнении.
Я использую composer-1.8.2-airflow-1.10.2
.
Я прочитал ответ на аналогичный вопрос:
Dynami c определение задачи в Airflow
Но я не пытаюсь создавать задачи на основе отдельной задачи, только на основе внешнего файла.
Это мой второй подход, который также застревает в этом состоянии ошибки:
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import Variable
import json
import os
products = json.loads(Variable.get("products"))
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2020, 1, 10),
}
with airflow.DAG(
'json_test2',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
def read_json_file(file_path):
if os.path.exists(file_path):
with open(file_path, 'r') as f:
return json.load(f)
def get_run_list(files):
run_list = []
#The file is uploaded in the storage bucket used as a volume by Composer
last_exec_json = read_json_file("/home/airflow/gcs/data/last_execution.json")
date = last_exec_json["date"]
hour = last_exec_json["hour"]
for file in files:
#Testing by adding just date and hour
name = file['name']+f'_{date}_{hour}'
run_list.append(name)
return run_list
rl = get_run_list(products)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
for name in rl:
tsk = DummyOperator(task_id=name, dag=dag)
start >> tsk >> end